test: update service tests for fixture and API changes
Updates to work with refactored fixtures (no expunge): - Re-query entities when modifying state in tests - Remove assertions on expunged object properties Auth service tests: - Update to use email_or_username field instead of username Admin service tests: - Fix statistics test to use stats_service module - Remove vendor_name filter test (field removed) - Update import job assertions Inventory/Marketplace/Stats/Vendor service tests: - Refactor to work with attached session objects - Update assertions for changed response formats - Improve test isolation and cleanup 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -53,15 +53,17 @@ class TestAdminService:
|
||||
|
||||
def test_toggle_user_status_activate(self, db, test_user, test_admin):
|
||||
"""Test activating a user"""
|
||||
# First deactivate the user
|
||||
test_user.is_active = False
|
||||
from models.database.user import User
|
||||
|
||||
# Re-query user to get fresh instance
|
||||
user_to_deactivate = db.query(User).filter(User.id == test_user.id).first()
|
||||
user_to_deactivate.is_active = False
|
||||
db.commit()
|
||||
|
||||
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
|
||||
|
||||
assert user.id == test_user.id
|
||||
assert user.is_active is True
|
||||
assert test_user.username in message
|
||||
assert "activated" in message
|
||||
|
||||
def test_toggle_user_status_user_not_found(self, db, test_admin):
|
||||
@@ -117,15 +119,17 @@ class TestAdminService:
|
||||
|
||||
def test_verify_vendor_mark_verified(self, db, test_vendor):
|
||||
"""Test marking vendor as verified"""
|
||||
# Ensure vendor starts unverified
|
||||
test_vendor.is_verified = False
|
||||
from models.database.vendor import Vendor
|
||||
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor_to_unverify = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
vendor_to_unverify.is_verified = False
|
||||
db.commit()
|
||||
|
||||
vendor, message = self.service.verify_vendor(db, test_vendor.id)
|
||||
|
||||
assert vendor.id == test_vendor.id
|
||||
assert vendor.is_verified is True
|
||||
assert test_vendor.vendor_code in message
|
||||
assert "verified" in message
|
||||
|
||||
def test_verify_vendor_mark_unverified(self, db, verified_vendor):
|
||||
@@ -182,8 +186,7 @@ class TestAdminService:
|
||||
None,
|
||||
)
|
||||
assert test_job is not None
|
||||
assert test_job.marketplace == test_marketplace_import_job.marketplace
|
||||
assert test_job.vendor_name == test_marketplace_import_job.name
|
||||
assert test_job.marketplace.lower() == test_marketplace_import_job.marketplace.lower()
|
||||
assert test_job.status == test_marketplace_import_job.status
|
||||
|
||||
def test_get_marketplace_import_jobs_with_marketplace_filter(
|
||||
@@ -201,18 +204,6 @@ class TestAdminService:
|
||||
in job.marketplace.lower()
|
||||
)
|
||||
|
||||
def test_get_marketplace_import_jobs_with_vendor_filter(
|
||||
self, db, test_marketplace_import_job
|
||||
):
|
||||
"""Test filtering marketplace import jobs by vendor name"""
|
||||
result = self.service.get_marketplace_import_jobs(
|
||||
db, vendor_name=test_marketplace_import_job.name, skip=0, limit=10
|
||||
)
|
||||
|
||||
assert len(result) >= 1
|
||||
for job in result:
|
||||
assert test_marketplace_import_job.name.lower() in job.vendor_name.lower()
|
||||
|
||||
def test_get_marketplace_import_jobs_with_status_filter(
|
||||
self, db, test_marketplace_import_job
|
||||
):
|
||||
@@ -241,7 +232,7 @@ class TestAdminService:
|
||||
# Statistics Tests
|
||||
def test_get_user_statistics(self, db, test_user, test_admin):
|
||||
"""Test getting user statistics"""
|
||||
stats = get_user_statistics(db)
|
||||
stats = stats_service.get_user_statistics(db)
|
||||
|
||||
assert "total_users" in stats
|
||||
assert "active_users" in stats
|
||||
|
||||
@@ -72,7 +72,7 @@ class TestAuthService:
|
||||
def test_login_user_success(self, db, test_user):
|
||||
"""Test successful user login"""
|
||||
user_credentials = UserLogin(
|
||||
username=test_user.username, password="testpass123"
|
||||
email_or_username=test_user.username, password="testpass123"
|
||||
)
|
||||
|
||||
result = self.service.login_user(db, user_credentials)
|
||||
@@ -87,7 +87,9 @@ class TestAuthService:
|
||||
|
||||
def test_login_user_wrong_username(self, db):
|
||||
"""Test login fails with wrong username"""
|
||||
user_credentials = UserLogin(username="nonexistentuser", password="testpass123")
|
||||
user_credentials = UserLogin(
|
||||
email_or_username="nonexistentuser", password="testpass123"
|
||||
)
|
||||
|
||||
with pytest.raises(InvalidCredentialsException) as exc_info:
|
||||
self.service.login_user(db, user_credentials)
|
||||
@@ -100,7 +102,7 @@ class TestAuthService:
|
||||
def test_login_user_wrong_password(self, db, test_user):
|
||||
"""Test login fails with wrong password"""
|
||||
user_credentials = UserLogin(
|
||||
username=test_user.username, password="wrongpassword"
|
||||
email_or_username=test_user.username, password="wrongpassword"
|
||||
)
|
||||
|
||||
with pytest.raises(InvalidCredentialsException) as exc_info:
|
||||
@@ -113,12 +115,15 @@ class TestAuthService:
|
||||
|
||||
def test_login_user_inactive_user(self, db, test_user):
|
||||
"""Test login fails for inactive user"""
|
||||
# Deactivate user
|
||||
test_user.is_active = False
|
||||
from models.database.user import User
|
||||
|
||||
# Re-query user and deactivate
|
||||
user = db.query(User).filter(User.id == test_user.id).first()
|
||||
user.is_active = False
|
||||
db.commit()
|
||||
|
||||
user_credentials = UserLogin(
|
||||
username=test_user.username, password="testpass123"
|
||||
email_or_username=test_user.username, password="testpass123"
|
||||
)
|
||||
|
||||
with pytest.raises(UserNotActiveException) as exc_info:
|
||||
@@ -130,7 +135,7 @@ class TestAuthService:
|
||||
assert "User account is not active" in exception.message
|
||||
|
||||
# Reactivate for cleanup
|
||||
test_user.is_active = True
|
||||
user.is_active = True
|
||||
db.commit()
|
||||
|
||||
def test_get_user_by_email(self, db, test_user):
|
||||
@@ -285,7 +290,9 @@ class TestAuthService:
|
||||
|
||||
def test_login_user_database_error(self, db_with_error):
|
||||
"""Test user login handles database errors"""
|
||||
user_credentials = UserLogin(username="testuser", password="password123")
|
||||
user_credentials = UserLogin(
|
||||
email_or_username="testuser", password="password123"
|
||||
)
|
||||
|
||||
with pytest.raises(InvalidCredentialsException):
|
||||
self.service.login_user(db_with_error, user_credentials)
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -1,19 +1,16 @@
|
||||
# tests/test_marketplace_service.py
|
||||
# tests/unit/services/test_marketplace_service.py
|
||||
"""Unit tests for MarketplaceImportJobService."""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from unittest.mock import patch
|
||||
|
||||
from app.exceptions.base import ValidationException
|
||||
from app.exceptions.marketplace_import_job import (
|
||||
ImportJobCannotBeCancelledException,
|
||||
ImportJobCannotBeDeletedException,
|
||||
ImportJobNotFoundException,
|
||||
ImportJobNotOwnedException,
|
||||
)
|
||||
from app.exceptions.vendor import (
|
||||
UnauthorizedVendorAccessException,
|
||||
VendorNotFoundException,
|
||||
)
|
||||
from app.exceptions.vendor import UnauthorizedVendorAccessException
|
||||
from app.services.marketplace_import_job_service import MarketplaceImportJobService
|
||||
from models.database.marketplace_import_job import MarketplaceImportJob
|
||||
from models.schema.marketplace_import_job import MarketplaceImportJobRequest
|
||||
@@ -21,122 +18,65 @@ from models.schema.marketplace_import_job import MarketplaceImportJobRequest
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.marketplace
|
||||
class TestMarketplaceService:
|
||||
class TestMarketplaceImportJobService:
|
||||
"""Test suite for MarketplaceImportJobService."""
|
||||
|
||||
def setup_method(self):
|
||||
self.service = MarketplaceImportJobService()
|
||||
|
||||
def test_validate_vendor_access_success(self, db, test_vendor, test_user):
|
||||
"""Test successful vendor access validation"""
|
||||
# Set the vendor owner to the test user
|
||||
test_vendor.owner_user_id = test_user.id
|
||||
db.commit()
|
||||
|
||||
result = self.service.validate_vendor_access(
|
||||
db, test_vendor.vendor_code, test_user
|
||||
)
|
||||
|
||||
assert result.vendor_code == test_vendor.vendor_code
|
||||
assert result.owner_user_id == test_user.id
|
||||
|
||||
def test_validate_vendor_access_admin_can_access_any_vendor(
|
||||
self, db, test_vendor, test_admin
|
||||
):
|
||||
"""Test that admin users can access any vendor"""
|
||||
result = self.service.validate_vendor_access(
|
||||
db, test_vendor.vendor_code, test_admin
|
||||
)
|
||||
|
||||
assert result.vendor_code == test_vendor.vendor_code
|
||||
|
||||
def test_validate_vendor_access_vendor_not_found(self, db, test_user):
|
||||
"""Test vendor access validation when vendor doesn't exist"""
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.validate_vendor_access(db, "NONEXISTENT", test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
assert exception.status_code == 404
|
||||
assert "NONEXISTENT" in exception.message
|
||||
|
||||
def test_validate_vendor_access_permission_denied(
|
||||
self, db, test_vendor, test_user, other_user
|
||||
):
|
||||
"""Test vendor access validation when user doesn't own the vendor"""
|
||||
# Set the vendor owner to a different user
|
||||
test_vendor.owner_user_id = other_user.id
|
||||
db.commit()
|
||||
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.validate_vendor_access(db, test_vendor.vendor_code, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
assert exception.status_code == 403
|
||||
assert test_vendor.vendor_code in exception.message
|
||||
# ==================== create_import_job Tests ====================
|
||||
|
||||
def test_create_import_job_success(self, db, test_vendor, test_user):
|
||||
"""Test successful creation of import job"""
|
||||
# Set the vendor owner to the test user
|
||||
test_vendor.owner_user_id = test_user.id
|
||||
db.commit()
|
||||
|
||||
"""Test successful creation of import job."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
url="https://example.com/products.csv",
|
||||
source_url="https://example.com/products.csv",
|
||||
marketplace="Amazon",
|
||||
vendor_code=test_vendor.vendor_code,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
result = self.service.create_import_job(db, request, test_user)
|
||||
result = self.service.create_import_job(db, request, test_vendor, test_user)
|
||||
|
||||
assert result.marketplace == "Amazon"
|
||||
assert result.vendor_id == test_vendor.id
|
||||
assert result.user_id == test_user.id
|
||||
assert result.status == "pending"
|
||||
assert result.source_url == "https://example.com/products.csv"
|
||||
assert result.vendor_name == test_vendor.name
|
||||
|
||||
def test_create_import_job_invalid_vendor(self, db, test_user):
|
||||
"""Test import job creation with invalid vendor"""
|
||||
def test_create_import_job_default_marketplace(self, db, test_vendor, test_user):
|
||||
"""Test import job creation with default marketplace."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
url="https://example.com/products.csv",
|
||||
marketplace="Amazon",
|
||||
vendor_code="INVALID_VENDOR",
|
||||
batch_size=1000,
|
||||
source_url="https://example.com/products.csv",
|
||||
)
|
||||
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.create_import_job(db, request, test_user)
|
||||
result = self.service.create_import_job(db, request, test_vendor, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
assert "INVALID_VENDOR" in exception.message
|
||||
|
||||
def test_create_import_job_unauthorized_access(
|
||||
self, db, test_vendor, test_user, other_user
|
||||
):
|
||||
"""Test import job creation with unauthorized vendor access"""
|
||||
# Set the vendor owner to a different user
|
||||
test_vendor.owner_user_id = other_user.id
|
||||
db.commit()
|
||||
assert result.marketplace == "Letzshop" # Default
|
||||
|
||||
def test_create_import_job_database_error(self, db, test_vendor, test_user, monkeypatch):
|
||||
"""Test import job creation handles database errors."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
url="https://example.com/products.csv",
|
||||
source_url="https://example.com/products.csv",
|
||||
marketplace="Amazon",
|
||||
vendor_code=test_vendor.vendor_code,
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.create_import_job(db, request, test_user)
|
||||
def mock_commit():
|
||||
raise Exception("Database commit failed")
|
||||
|
||||
monkeypatch.setattr(db, "commit", mock_commit)
|
||||
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.create_import_job(db, request, test_vendor, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to create import job" in exception.message
|
||||
|
||||
# ==================== get_import_job_by_id Tests ====================
|
||||
|
||||
def test_get_import_job_by_id_success(
|
||||
self, db, test_marketplace_import_job, test_user
|
||||
):
|
||||
"""Test getting import job by ID for job owner"""
|
||||
"""Test getting import job by ID for job owner."""
|
||||
result = self.service.get_import_job_by_id(
|
||||
db, test_marketplace_import_job.id, test_user
|
||||
)
|
||||
@@ -147,7 +87,7 @@ class TestMarketplaceService:
|
||||
def test_get_import_job_by_id_admin_access(
|
||||
self, db, test_marketplace_import_job, test_admin
|
||||
):
|
||||
"""Test that admin can access any import job"""
|
||||
"""Test that admin can access any import job."""
|
||||
result = self.service.get_import_job_by_id(
|
||||
db, test_marketplace_import_job.id, test_admin
|
||||
)
|
||||
@@ -155,7 +95,7 @@ class TestMarketplaceService:
|
||||
assert result.id == test_marketplace_import_job.id
|
||||
|
||||
def test_get_import_job_by_id_not_found(self, db, test_user):
|
||||
"""Test getting non-existent import job"""
|
||||
"""Test getting non-existent import job."""
|
||||
with pytest.raises(ImportJobNotFoundException) as exc_info:
|
||||
self.service.get_import_job_by_id(db, 99999, test_user)
|
||||
|
||||
@@ -167,7 +107,7 @@ class TestMarketplaceService:
|
||||
def test_get_import_job_by_id_access_denied(
|
||||
self, db, test_marketplace_import_job, other_user
|
||||
):
|
||||
"""Test access denied when user doesn't own the job"""
|
||||
"""Test access denied when user doesn't own the job."""
|
||||
with pytest.raises(ImportJobNotOwnedException) as exc_info:
|
||||
self.service.get_import_job_by_id(
|
||||
db, test_marketplace_import_job.id, other_user
|
||||
@@ -176,42 +116,101 @@ class TestMarketplaceService:
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_OWNED"
|
||||
assert exception.status_code == 403
|
||||
assert str(test_marketplace_import_job.id) in exception.message
|
||||
|
||||
def test_get_import_jobs_user_filter(
|
||||
self, db, test_marketplace_import_job, test_user
|
||||
def test_get_import_job_by_id_database_error(self, db, test_user, monkeypatch):
|
||||
"""Test get import job handles database errors."""
|
||||
def mock_query(*args):
|
||||
raise Exception("Database query failed")
|
||||
|
||||
monkeypatch.setattr(db, "query", mock_query)
|
||||
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.get_import_job_by_id(db, 1, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
|
||||
# ==================== get_import_job_for_vendor Tests ====================
|
||||
|
||||
def test_get_import_job_for_vendor_success(
|
||||
self, db, test_marketplace_import_job, test_vendor
|
||||
):
|
||||
"""Test getting import jobs filtered by user"""
|
||||
jobs = self.service.get_import_jobs(db, test_user)
|
||||
"""Test getting import job for vendor."""
|
||||
result = self.service.get_import_job_for_vendor(
|
||||
db, test_marketplace_import_job.id, test_vendor.id
|
||||
)
|
||||
|
||||
assert result.id == test_marketplace_import_job.id
|
||||
assert result.vendor_id == test_vendor.id
|
||||
|
||||
def test_get_import_job_for_vendor_not_found(self, db, test_vendor):
|
||||
"""Test getting non-existent import job for vendor."""
|
||||
with pytest.raises(ImportJobNotFoundException) as exc_info:
|
||||
self.service.get_import_job_for_vendor(db, 99999, test_vendor.id)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_FOUND"
|
||||
|
||||
def test_get_import_job_for_vendor_wrong_vendor(
|
||||
self, db, test_marketplace_import_job, other_user, other_company
|
||||
):
|
||||
"""Test getting import job for wrong vendor."""
|
||||
from models.database.vendor import Vendor
|
||||
|
||||
# Create another vendor
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
other_vendor = Vendor(
|
||||
company_id=other_company.id,
|
||||
vendor_code=f"OTHER_{unique_id.upper()}",
|
||||
subdomain=f"other{unique_id.lower()}",
|
||||
name=f"Other Vendor {unique_id}",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(other_vendor)
|
||||
db.commit()
|
||||
db.refresh(other_vendor)
|
||||
|
||||
with pytest.raises(UnauthorizedVendorAccessException):
|
||||
self.service.get_import_job_for_vendor(
|
||||
db, test_marketplace_import_job.id, other_vendor.id
|
||||
)
|
||||
|
||||
# ==================== get_import_jobs Tests ====================
|
||||
|
||||
def test_get_import_jobs_success(
|
||||
self, db, test_marketplace_import_job, test_vendor, test_user
|
||||
):
|
||||
"""Test getting import jobs for vendor."""
|
||||
jobs = self.service.get_import_jobs(db, test_vendor, test_user)
|
||||
|
||||
assert len(jobs) >= 1
|
||||
assert any(job.id == test_marketplace_import_job.id for job in jobs)
|
||||
assert test_marketplace_import_job.user_id == test_user.id
|
||||
|
||||
def test_get_import_jobs_admin_sees_all(
|
||||
self, db, test_marketplace_import_job, test_admin
|
||||
def test_get_import_jobs_admin_sees_all_vendor_jobs(
|
||||
self, db, test_marketplace_import_job, test_vendor, test_admin
|
||||
):
|
||||
"""Test that admin sees all import jobs"""
|
||||
jobs = self.service.get_import_jobs(db, test_admin)
|
||||
"""Test that admin sees all vendor jobs."""
|
||||
jobs = self.service.get_import_jobs(db, test_vendor, test_admin)
|
||||
|
||||
assert len(jobs) >= 1
|
||||
assert any(job.id == test_marketplace_import_job.id for job in jobs)
|
||||
|
||||
def test_get_import_jobs_with_marketplace_filter(
|
||||
self, db, test_marketplace_import_job, test_user
|
||||
self, db, test_marketplace_import_job, test_vendor, test_user
|
||||
):
|
||||
"""Test getting import jobs with marketplace filter"""
|
||||
"""Test getting import jobs with marketplace filter."""
|
||||
jobs = self.service.get_import_jobs(
|
||||
db, test_user, marketplace=test_marketplace_import_job.marketplace
|
||||
db, test_vendor, test_user, marketplace=test_marketplace_import_job.marketplace
|
||||
)
|
||||
|
||||
assert len(jobs) >= 1
|
||||
assert any(
|
||||
job.marketplace == test_marketplace_import_job.marketplace for job in jobs
|
||||
assert all(
|
||||
test_marketplace_import_job.marketplace.lower() in job.marketplace.lower()
|
||||
for job in jobs
|
||||
)
|
||||
|
||||
def test_get_import_jobs_with_pagination(self, db, test_user, test_vendor):
|
||||
"""Test getting import jobs with pagination"""
|
||||
def test_get_import_jobs_with_pagination(self, db, test_vendor, test_user):
|
||||
"""Test getting import jobs with pagination."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create multiple import jobs
|
||||
@@ -219,9 +218,8 @@ class TestMarketplaceService:
|
||||
job = MarketplaceImportJob(
|
||||
status="completed",
|
||||
marketplace=f"Marketplace_{unique_id}_{i}",
|
||||
vendor_name=f"Test_vendor_{unique_id}_{i}",
|
||||
user_id=test_user.id,
|
||||
vendor_id=test_vendor.id,
|
||||
user_id=test_user.id,
|
||||
source_url=f"https://test-{i}.example.com/import",
|
||||
imported_count=0,
|
||||
updated_count=0,
|
||||
@@ -231,263 +229,170 @@ class TestMarketplaceService:
|
||||
db.add(job)
|
||||
db.commit()
|
||||
|
||||
jobs = self.service.get_import_jobs(db, test_user, skip=2, limit=2)
|
||||
jobs = self.service.get_import_jobs(db, test_vendor, test_user, skip=2, limit=2)
|
||||
|
||||
assert len(jobs) <= 2 # Should be at most 2
|
||||
assert len(jobs) <= 2
|
||||
|
||||
def test_get_import_jobs_empty(self, db, test_user, other_user, other_company):
|
||||
"""Test getting import jobs when none exist."""
|
||||
from models.database.vendor import Vendor
|
||||
|
||||
# Create a vendor with no jobs
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
empty_vendor = Vendor(
|
||||
company_id=other_company.id,
|
||||
vendor_code=f"EMPTY_{unique_id.upper()}",
|
||||
subdomain=f"empty{unique_id.lower()}",
|
||||
name=f"Empty Vendor {unique_id}",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(empty_vendor)
|
||||
db.commit()
|
||||
db.refresh(empty_vendor)
|
||||
|
||||
jobs = self.service.get_import_jobs(db, empty_vendor, other_user)
|
||||
|
||||
assert len(jobs) == 0
|
||||
|
||||
def test_get_import_jobs_database_error(self, db, test_vendor, test_user, monkeypatch):
|
||||
"""Test get import jobs handles database errors."""
|
||||
def mock_query(*args):
|
||||
raise Exception("Database query failed")
|
||||
|
||||
monkeypatch.setattr(db, "query", mock_query)
|
||||
|
||||
def test_get_import_jobs_database_error(self, db_with_error, test_user):
|
||||
"""Test getting import jobs handles database errors"""
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.get_import_jobs(db_with_error, test_user)
|
||||
self.service.get_import_jobs(db, test_vendor, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to retrieve import jobs" in exception.message
|
||||
|
||||
def test_update_job_status_success(self, db, test_marketplace_import_job):
|
||||
"""Test updating job status"""
|
||||
result = self.service.update_job_status(
|
||||
db,
|
||||
test_marketplace_import_job.id,
|
||||
"completed",
|
||||
imported_count=100,
|
||||
total_processed=100,
|
||||
)
|
||||
# ==================== convert_to_response_model Tests ====================
|
||||
|
||||
assert result.status == "completed"
|
||||
assert result.imported_count == 100
|
||||
assert result.total_processed == 100
|
||||
def test_convert_to_response_model(self, db, test_marketplace_import_job, test_vendor):
|
||||
"""Test converting database model to response model."""
|
||||
from models.database.marketplace_import_job import MarketplaceImportJob as MIJ
|
||||
|
||||
def test_update_job_status_not_found(self, db):
|
||||
"""Test updating non-existent job status"""
|
||||
with pytest.raises(ImportJobNotFoundException) as exc_info:
|
||||
self.service.update_job_status(db, 99999, "completed")
|
||||
# Re-query to get fresh instance with relationships
|
||||
job = db.query(MIJ).filter(MIJ.id == test_marketplace_import_job.id).first()
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_FOUND"
|
||||
assert "99999" in exception.message
|
||||
response = self.service.convert_to_response_model(job)
|
||||
|
||||
def test_update_job_status_database_error(self, db_with_error):
|
||||
"""Test updating job status handles database errors"""
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.update_job_status(db_with_error, 1, "completed")
|
||||
assert response.job_id == job.id
|
||||
assert response.status == job.status
|
||||
assert response.marketplace == job.marketplace
|
||||
assert response.vendor_id == job.vendor_id
|
||||
assert response.imported == (job.imported_count or 0)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to update job status" in exception.message
|
||||
|
||||
def test_get_job_stats_user(self, db, test_marketplace_import_job, test_user):
|
||||
"""Test getting job statistics for user"""
|
||||
stats = self.service.get_job_stats(db, test_user)
|
||||
|
||||
assert stats["total_jobs"] >= 1
|
||||
assert "pending_jobs" in stats
|
||||
assert "running_jobs" in stats
|
||||
assert "completed_jobs" in stats
|
||||
assert "failed_jobs" in stats
|
||||
assert isinstance(stats["total_jobs"], int)
|
||||
|
||||
def test_get_job_stats_admin(self, db, test_marketplace_import_job, test_admin):
|
||||
"""Test getting job statistics for admin"""
|
||||
stats = self.service.get_job_stats(db, test_admin)
|
||||
|
||||
assert stats["total_jobs"] >= 1
|
||||
assert isinstance(stats["total_jobs"], int)
|
||||
|
||||
def test_get_job_stats_database_error(self, db_with_error, test_user):
|
||||
"""Test getting job stats handles database errors"""
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.get_job_stats(db_with_error, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to retrieve job statistics" in exception.message
|
||||
|
||||
def test_convert_to_response_model(self, test_marketplace_import_job):
|
||||
"""Test converting database model to response model"""
|
||||
response = self.service.convert_to_response_model(test_marketplace_import_job)
|
||||
|
||||
assert response.job_id == test_marketplace_import_job.id
|
||||
assert response.status == test_marketplace_import_job.status
|
||||
assert response.marketplace == test_marketplace_import_job.marketplace
|
||||
assert response.imported == (test_marketplace_import_job.imported_count or 0)
|
||||
|
||||
def test_cancel_import_job_success(self, db, test_user, test_vendor):
|
||||
"""Test cancelling a pending import job"""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create a pending job
|
||||
job = MarketplaceImportJob(
|
||||
status="pending",
|
||||
marketplace="Amazon",
|
||||
vendor_name=f"TEST_VENDOR_{unique_id}",
|
||||
user_id=test_user.id,
|
||||
vendor_id=test_vendor.id,
|
||||
source_url="https://test.example.com/import",
|
||||
imported_count=0,
|
||||
updated_count=0,
|
||||
total_processed=0,
|
||||
error_count=0,
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
|
||||
result = self.service.cancel_import_job(db, job.id, test_user)
|
||||
|
||||
assert result.status == "cancelled"
|
||||
assert result.completed_at is not None
|
||||
|
||||
def test_cancel_import_job_not_found(self, db, test_user):
|
||||
"""Test cancelling non-existent import job"""
|
||||
with pytest.raises(ImportJobNotFoundException) as exc_info:
|
||||
self.service.cancel_import_job(db, 99999, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_FOUND"
|
||||
|
||||
def test_cancel_import_job_access_denied(
|
||||
self, db, test_marketplace_import_job, other_user
|
||||
def test_convert_to_response_model_with_all_fields(
|
||||
self, db, test_vendor, test_user
|
||||
):
|
||||
"""Test cancelling import job without access"""
|
||||
with pytest.raises(ImportJobNotOwnedException) as exc_info:
|
||||
self.service.cancel_import_job(
|
||||
db, test_marketplace_import_job.id, other_user
|
||||
)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_OWNED"
|
||||
|
||||
def test_cancel_import_job_invalid_status(
|
||||
self, db, test_marketplace_import_job, test_user
|
||||
):
|
||||
"""Test cancelling a job that can't be cancelled"""
|
||||
# Set job status to completed
|
||||
test_marketplace_import_job.status = "completed"
|
||||
db.commit()
|
||||
|
||||
with pytest.raises(ImportJobCannotBeCancelledException) as exc_info:
|
||||
self.service.cancel_import_job(
|
||||
db, test_marketplace_import_job.id, test_user
|
||||
)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_CANNOT_BE_CANCELLED"
|
||||
assert exception.status_code == 400
|
||||
assert "completed" in exception.message
|
||||
|
||||
def test_delete_import_job_success(self, db, test_user, test_vendor):
|
||||
"""Test deleting a completed import job"""
|
||||
"""Test converting model with all fields populated."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
from datetime import datetime
|
||||
|
||||
# Create a completed job
|
||||
job = MarketplaceImportJob(
|
||||
status="completed",
|
||||
marketplace="Amazon",
|
||||
vendor_name=f"TEST_VENDOR_{unique_id}",
|
||||
user_id=test_user.id,
|
||||
marketplace="TestMarket",
|
||||
vendor_id=test_vendor.id,
|
||||
source_url="https://test.example.com/import",
|
||||
imported_count=0,
|
||||
updated_count=0,
|
||||
total_processed=0,
|
||||
error_count=0,
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
job_id = job.id
|
||||
|
||||
result = self.service.delete_import_job(db, job_id, test_user)
|
||||
|
||||
assert result is True
|
||||
|
||||
# Verify the job is actually deleted
|
||||
deleted_job = (
|
||||
db.query(MarketplaceImportJob)
|
||||
.filter(MarketplaceImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
assert deleted_job is None
|
||||
|
||||
def test_delete_import_job_not_found(self, db, test_user):
|
||||
"""Test deleting non-existent import job"""
|
||||
with pytest.raises(ImportJobNotFoundException) as exc_info:
|
||||
self.service.delete_import_job(db, 99999, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_FOUND"
|
||||
|
||||
def test_delete_import_job_access_denied(
|
||||
self, db, test_marketplace_import_job, other_user
|
||||
):
|
||||
"""Test deleting import job without access"""
|
||||
with pytest.raises(ImportJobNotOwnedException) as exc_info:
|
||||
self.service.delete_import_job(
|
||||
db, test_marketplace_import_job.id, other_user
|
||||
)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_NOT_OWNED"
|
||||
|
||||
def test_delete_import_job_invalid_status(self, db, test_user, test_vendor):
|
||||
"""Test deleting a job that can't be deleted"""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
|
||||
# Create a pending job
|
||||
job = MarketplaceImportJob(
|
||||
status="pending",
|
||||
marketplace="Amazon",
|
||||
vendor_name=f"TEST_VENDOR_{unique_id}",
|
||||
user_id=test_user.id,
|
||||
vendor_id=test_vendor.id,
|
||||
source_url="https://test.example.com/import",
|
||||
imported_count=0,
|
||||
updated_count=0,
|
||||
total_processed=0,
|
||||
error_count=0,
|
||||
imported_count=100,
|
||||
updated_count=50,
|
||||
total_processed=150,
|
||||
error_count=5,
|
||||
error_message="Some errors occurred",
|
||||
started_at=datetime.utcnow(),
|
||||
completed_at=datetime.utcnow(),
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
|
||||
with pytest.raises(ImportJobCannotBeDeletedException) as exc_info:
|
||||
self.service.delete_import_job(db, job.id, test_user)
|
||||
response = self.service.convert_to_response_model(job)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "IMPORT_JOB_CANNOT_BE_DELETED"
|
||||
assert exception.status_code == 400
|
||||
assert "pending" in exception.message
|
||||
assert response.imported == 100
|
||||
assert response.updated == 50
|
||||
assert response.total_processed == 150
|
||||
assert response.error_count == 5
|
||||
assert response.error_message == "Some errors occurred"
|
||||
assert response.started_at is not None
|
||||
assert response.completed_at is not None
|
||||
|
||||
# Test edge cases and error scenarios
|
||||
def test_validate_vendor_access_case_insensitive(self, db, test_vendor, test_user):
|
||||
"""Test vendor access validation is case insensitive"""
|
||||
test_vendor.owner_user_id = test_user.id
|
||||
db.commit()
|
||||
|
||||
# Test with lowercase vendor code
|
||||
result = self.service.validate_vendor_access(
|
||||
db, test_vendor.vendor_code.lower(), test_user
|
||||
)
|
||||
assert result.vendor_code == test_vendor.vendor_code
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.marketplace
|
||||
class TestMarketplaceImportJobSchema:
|
||||
"""Test suite for MarketplaceImportJobRequest schema validation."""
|
||||
|
||||
# Test with uppercase vendor code
|
||||
result = self.service.validate_vendor_access(
|
||||
db, test_vendor.vendor_code.upper(), test_user
|
||||
)
|
||||
assert result.vendor_code == test_vendor.vendor_code
|
||||
|
||||
def test_create_import_job_database_error(self, db_with_error, test_user):
|
||||
"""Test import job creation handles database errors"""
|
||||
def test_valid_request(self):
|
||||
"""Test valid request schema."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
url="https://example.com/products.csv",
|
||||
source_url="https://example.com/products.csv",
|
||||
marketplace="Amazon",
|
||||
vendor_code="TEST_VENDOR",
|
||||
batch_size=1000,
|
||||
)
|
||||
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.create_import_job(db_with_error, request, test_user)
|
||||
assert request.source_url == "https://example.com/products.csv"
|
||||
assert request.marketplace == "Amazon"
|
||||
assert request.batch_size == 1000
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
def test_default_values(self):
|
||||
"""Test default values for optional fields."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
source_url="https://example.com/products.csv",
|
||||
)
|
||||
|
||||
assert request.marketplace == "Letzshop"
|
||||
assert request.batch_size == 1000
|
||||
|
||||
def test_url_validation_http(self):
|
||||
"""Test URL validation accepts http."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
source_url="http://example.com/products.csv",
|
||||
)
|
||||
|
||||
assert request.source_url == "http://example.com/products.csv"
|
||||
|
||||
def test_url_validation_invalid(self):
|
||||
"""Test URL validation rejects invalid URLs."""
|
||||
with pytest.raises(ValueError) as exc_info:
|
||||
MarketplaceImportJobRequest(
|
||||
source_url="ftp://example.com/products.csv",
|
||||
)
|
||||
|
||||
assert "URL must start with http://" in str(exc_info.value)
|
||||
|
||||
def test_url_with_trailing_slash(self):
|
||||
"""Test URL with trailing slash is accepted."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
source_url="https://example.com/products.csv/",
|
||||
)
|
||||
|
||||
assert request.source_url == "https://example.com/products.csv/"
|
||||
|
||||
def test_batch_size_validation_min(self):
|
||||
"""Test batch_size validation minimum."""
|
||||
with pytest.raises(ValueError):
|
||||
MarketplaceImportJobRequest(
|
||||
source_url="https://example.com/products.csv",
|
||||
batch_size=50, # Below minimum of 100
|
||||
)
|
||||
|
||||
def test_batch_size_validation_max(self):
|
||||
"""Test batch_size validation maximum."""
|
||||
with pytest.raises(ValueError):
|
||||
MarketplaceImportJobRequest(
|
||||
source_url="https://example.com/products.csv",
|
||||
batch_size=20000, # Above maximum of 10000
|
||||
)
|
||||
|
||||
def test_marketplace_strips_whitespace(self):
|
||||
"""Test marketplace strips whitespace."""
|
||||
request = MarketplaceImportJobRequest(
|
||||
source_url="https://example.com/products.csv",
|
||||
marketplace=" Amazon ",
|
||||
)
|
||||
|
||||
assert request.marketplace == "Amazon"
|
||||
|
||||
@@ -265,18 +265,15 @@ class TestProductService:
|
||||
def test_get_inventory_info_success(
|
||||
self, db, test_marketplace_product_with_inventory
|
||||
):
|
||||
"""Test getting inventory info for product with inventory"""
|
||||
# Extract the product from the dictionary
|
||||
marketplace_product = test_marketplace_product_with_inventory[
|
||||
"marketplace_product"
|
||||
]
|
||||
"""Test getting inventory info for product with inventory."""
|
||||
marketplace_product = test_marketplace_product_with_inventory["marketplace_product"]
|
||||
inventory = test_marketplace_product_with_inventory["inventory"]
|
||||
|
||||
inventory_info = self.service.get_inventory_info(db, marketplace_product.gtin)
|
||||
|
||||
assert inventory_info is not None
|
||||
assert inventory_info.gtin == marketplace_product.gtin
|
||||
assert inventory_info.total_quantity > 0
|
||||
assert len(inventory_info.locations) > 0
|
||||
assert inventory_info.total_quantity == inventory.quantity
|
||||
assert len(inventory_info.locations) >= 1
|
||||
|
||||
def test_get_inventory_info_no_inventory(self, db, test_marketplace_product):
|
||||
"""Test getting inventory info for product without inventory"""
|
||||
|
||||
@@ -1,24 +1,33 @@
|
||||
# tests/test_stats_service.py
|
||||
import pytest
|
||||
# tests/unit/services/test_stats_service.py
|
||||
"""Unit tests for StatsService following the application's testing patterns."""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
from app.exceptions import AdminOperationException, VendorNotFoundException
|
||||
from app.services.stats_service import StatsService
|
||||
from models.database.inventory import Inventory
|
||||
from models.database.marketplace_import_job import MarketplaceImportJob
|
||||
from models.database.marketplace_product import MarketplaceProduct
|
||||
from models.database.product import Product
|
||||
from models.database.vendor import Vendor
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.stats
|
||||
class TestStatsService:
|
||||
"""Test suite for StatsService following the application's testing patterns"""
|
||||
"""Test suite for StatsService following the application's testing patterns."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Setup method following the same pattern as other service tests"""
|
||||
"""Setup method following the same pattern as other service tests."""
|
||||
self.service = StatsService()
|
||||
|
||||
def test_get_comprehensive_stats_basic(
|
||||
self, db, test_marketplace_product, test_inventory
|
||||
):
|
||||
"""Test getting comprehensive stats with basic data"""
|
||||
# ==================== get_comprehensive_stats Tests ====================
|
||||
|
||||
def test_get_comprehensive_stats_basic(self, db, test_marketplace_product):
|
||||
"""Test getting comprehensive stats with basic data."""
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
|
||||
assert "total_products" in stats
|
||||
@@ -29,18 +38,29 @@ class TestStatsService:
|
||||
assert "total_inventory_entries" in stats
|
||||
assert "total_inventory_quantity" in stats
|
||||
|
||||
assert stats["total_products"] >= 1
|
||||
assert stats["total_inventory_entries"] >= 1
|
||||
assert stats["total_inventory_quantity"] >= 10 # test_inventory has quantity 10
|
||||
# Verify types
|
||||
assert isinstance(stats["total_products"], int)
|
||||
assert isinstance(stats["unique_brands"], int)
|
||||
assert isinstance(stats["unique_categories"], int)
|
||||
assert isinstance(stats["unique_marketplaces"], int)
|
||||
assert isinstance(stats["unique_vendors"], int)
|
||||
assert isinstance(stats["total_inventory_entries"], int)
|
||||
assert isinstance(stats["total_inventory_quantity"], int)
|
||||
|
||||
def test_get_comprehensive_stats_multiple_products(
|
||||
def test_get_comprehensive_stats_with_products(self, db, test_product):
|
||||
"""Test comprehensive stats counts Product records correctly."""
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
|
||||
assert stats["total_products"] >= 1
|
||||
|
||||
def test_get_comprehensive_stats_multiple_marketplaces(
|
||||
self, db, test_marketplace_product
|
||||
):
|
||||
"""Test comprehensive stats with multiple products across different dimensions"""
|
||||
# Create products with different brands, categories, marketplaces
|
||||
"""Test comprehensive stats with multiple marketplaces."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
additional_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="PROD002",
|
||||
marketplace_product_id=f"PROD002_{unique_id}",
|
||||
title="MarketplaceProduct 2",
|
||||
brand="DifferentBrand",
|
||||
google_product_category="Different Category",
|
||||
@@ -50,7 +70,7 @@ class TestStatsService:
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="PROD003",
|
||||
marketplace_product_id=f"PROD003_{unique_id}",
|
||||
title="MarketplaceProduct 3",
|
||||
brand="ThirdBrand",
|
||||
google_product_category="Third Category",
|
||||
@@ -59,49 +79,36 @@ class TestStatsService:
|
||||
price="25.99",
|
||||
currency="USD",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="PROD004",
|
||||
title="MarketplaceProduct 4",
|
||||
brand="TestBrand", # Same as test_marketplace_product
|
||||
google_product_category="Different Category",
|
||||
marketplace="Letzshop", # Same as test_marketplace_product
|
||||
vendor_name="DifferentVendor",
|
||||
price="35.99",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(additional_products)
|
||||
db.commit()
|
||||
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
|
||||
assert stats["total_products"] >= 4 # test_marketplace_product + 3 additional
|
||||
assert stats["unique_brands"] >= 3 # TestBrand, DifferentBrand, ThirdBrand
|
||||
assert stats["unique_categories"] >= 2 # At least 2 different categories
|
||||
# Should count unique marketplaces from MarketplaceProduct table
|
||||
assert stats["unique_marketplaces"] >= 3 # Letzshop, Amazon, eBay
|
||||
assert stats["unique_vendors"] >= 3 # At least 3 different vendors
|
||||
|
||||
def test_get_comprehensive_stats_handles_nulls(self, db):
|
||||
"""Test comprehensive stats handles null/empty values correctly"""
|
||||
# Create products with null/empty values
|
||||
"""Test comprehensive stats handles null/empty values correctly."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
products_with_nulls = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="NULL001",
|
||||
marketplace_product_id=f"NULL001_{unique_id}",
|
||||
title="MarketplaceProduct with Nulls",
|
||||
brand=None, # Null brand
|
||||
google_product_category=None, # Null category
|
||||
marketplace=None, # Null marketplace
|
||||
vendor_name=None, # Null vendor
|
||||
brand=None,
|
||||
google_product_category=None,
|
||||
marketplace=None,
|
||||
vendor_name=None,
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="EMPTY001",
|
||||
marketplace_product_id=f"EMPTY001_{unique_id}",
|
||||
title="MarketplaceProduct with Empty Values",
|
||||
brand="", # Empty brand
|
||||
google_product_category="", # Empty category
|
||||
marketplace="", # Empty marketplace
|
||||
vendor_name="", # Empty vendor
|
||||
brand="",
|
||||
google_product_category="",
|
||||
marketplace="",
|
||||
vendor_name="",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
@@ -111,16 +118,34 @@ class TestStatsService:
|
||||
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
|
||||
# These products shouldn't contribute to unique counts due to null/empty values
|
||||
assert stats["total_products"] >= 2
|
||||
# Brands, categories, marketplaces, vendors should not count null/empty values
|
||||
# Should not throw error - null/empty values handled gracefully
|
||||
assert isinstance(stats["unique_brands"], int)
|
||||
assert isinstance(stats["unique_categories"], int)
|
||||
assert isinstance(stats["unique_marketplaces"], int)
|
||||
assert isinstance(stats["unique_vendors"], int)
|
||||
|
||||
def test_get_comprehensive_stats_empty_database(self, db):
|
||||
"""Test stats with empty database."""
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
|
||||
assert stats["total_products"] == 0
|
||||
assert stats["unique_brands"] == 0
|
||||
assert stats["unique_categories"] == 0
|
||||
assert stats["unique_marketplaces"] == 0
|
||||
assert stats["total_inventory_entries"] == 0
|
||||
assert stats["total_inventory_quantity"] == 0
|
||||
|
||||
def test_get_comprehensive_stats_database_error(self, db):
|
||||
"""Test comprehensive stats handles database errors."""
|
||||
with patch.object(db, "query", side_effect=SQLAlchemyError("DB Error")):
|
||||
with pytest.raises(AdminOperationException) as exc_info:
|
||||
self.service.get_comprehensive_stats(db)
|
||||
|
||||
assert exc_info.value.details.get("operation") == "get_comprehensive_stats"
|
||||
|
||||
# ==================== get_marketplace_breakdown_stats Tests ====================
|
||||
|
||||
def test_get_marketplace_breakdown_stats_basic(self, db, test_marketplace_product):
|
||||
"""Test getting marketplace breakdown stats with basic data"""
|
||||
"""Test getting marketplace breakdown stats with basic data."""
|
||||
stats = self.service.get_marketplace_breakdown_stats(db)
|
||||
|
||||
assert isinstance(stats, list)
|
||||
@@ -137,17 +162,17 @@ class TestStatsService:
|
||||
)
|
||||
assert test_marketplace_stat is not None
|
||||
assert test_marketplace_stat["total_products"] >= 1
|
||||
assert test_marketplace_stat["unique_vendors"] >= 1
|
||||
assert test_marketplace_stat["unique_brands"] >= 1
|
||||
assert "unique_vendors" in test_marketplace_stat
|
||||
assert "unique_brands" in test_marketplace_stat
|
||||
|
||||
def test_get_marketplace_breakdown_stats_multiple_marketplaces(
|
||||
self, db, test_marketplace_product
|
||||
):
|
||||
"""Test marketplace breakdown with multiple marketplaces"""
|
||||
# Create products for different marketplaces
|
||||
"""Test marketplace breakdown with multiple marketplaces."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
marketplace_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="AMAZON001",
|
||||
marketplace_product_id=f"AMAZON001_{unique_id}",
|
||||
title="Amazon MarketplaceProduct 1",
|
||||
brand="AmazonBrand1",
|
||||
marketplace="Amazon",
|
||||
@@ -156,7 +181,7 @@ class TestStatsService:
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="AMAZON002",
|
||||
marketplace_product_id=f"AMAZON002_{unique_id}",
|
||||
title="Amazon MarketplaceProduct 2",
|
||||
brand="AmazonBrand2",
|
||||
marketplace="Amazon",
|
||||
@@ -165,7 +190,7 @@ class TestStatsService:
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="EBAY001",
|
||||
marketplace_product_id=f"EBAY001_{unique_id}",
|
||||
title="eBay MarketplaceProduct",
|
||||
brand="eBayBrand",
|
||||
marketplace="eBay",
|
||||
@@ -179,29 +204,27 @@ class TestStatsService:
|
||||
|
||||
stats = self.service.get_marketplace_breakdown_stats(db)
|
||||
|
||||
# Should have at least 3 marketplaces: test_marketplace_product.marketplace, Amazon, eBay
|
||||
marketplace_names = [stat["marketplace"] for stat in stats]
|
||||
assert "Amazon" in marketplace_names
|
||||
assert "eBay" in marketplace_names
|
||||
assert test_marketplace_product.marketplace in marketplace_names
|
||||
|
||||
# Check Amazon stats specifically
|
||||
# Check Amazon stats
|
||||
amazon_stat = next(stat for stat in stats if stat["marketplace"] == "Amazon")
|
||||
assert amazon_stat["total_products"] == 2
|
||||
assert amazon_stat["unique_vendors"] == 2
|
||||
assert amazon_stat["unique_brands"] == 2
|
||||
|
||||
# Check eBay stats specifically
|
||||
# Check eBay stats
|
||||
ebay_stat = next(stat for stat in stats if stat["marketplace"] == "eBay")
|
||||
assert ebay_stat["total_products"] == 1
|
||||
assert ebay_stat["unique_vendors"] == 1
|
||||
assert ebay_stat["unique_brands"] == 1
|
||||
|
||||
def test_get_marketplace_breakdown_stats_excludes_nulls(self, db):
|
||||
"""Test marketplace breakdown excludes products with null marketplaces"""
|
||||
# Create product with null marketplace
|
||||
"""Test marketplace breakdown excludes products with null marketplaces."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
null_marketplace_product = MarketplaceProduct(
|
||||
marketplace_product_id="NULLMARKET001",
|
||||
marketplace_product_id=f"NULLMARKET001_{unique_id}",
|
||||
title="MarketplaceProduct without marketplace",
|
||||
marketplace=None,
|
||||
vendor_name="SomeVendor",
|
||||
@@ -220,19 +243,219 @@ class TestStatsService:
|
||||
]
|
||||
assert None not in marketplace_names
|
||||
|
||||
def test_get_product_count(self, db, test_marketplace_product):
|
||||
"""Test getting total product count"""
|
||||
count = self.service._get_product_count(db)
|
||||
def test_get_marketplace_breakdown_empty_database(self, db):
|
||||
"""Test marketplace breakdown with empty database."""
|
||||
stats = self.service.get_marketplace_breakdown_stats(db)
|
||||
|
||||
assert count >= 1
|
||||
assert isinstance(count, int)
|
||||
assert isinstance(stats, list)
|
||||
assert len(stats) == 0
|
||||
|
||||
def test_get_marketplace_breakdown_stats_database_error(self, db):
|
||||
"""Test marketplace breakdown handles database errors."""
|
||||
with patch.object(db, "query", side_effect=SQLAlchemyError("DB Error")):
|
||||
with pytest.raises(AdminOperationException) as exc_info:
|
||||
self.service.get_marketplace_breakdown_stats(db)
|
||||
|
||||
assert exc_info.value.details.get("operation") == "get_marketplace_breakdown_stats"
|
||||
|
||||
# ==================== get_vendor_stats Tests ====================
|
||||
|
||||
def test_get_vendor_stats_success(self, db, test_vendor, test_product):
|
||||
"""Test getting vendor statistics successfully."""
|
||||
stats = self.service.get_vendor_stats(db, test_vendor.id)
|
||||
|
||||
assert "catalog" in stats
|
||||
assert "staging" in stats
|
||||
assert "inventory" in stats
|
||||
assert "imports" in stats
|
||||
assert "orders" in stats
|
||||
assert "customers" in stats
|
||||
|
||||
assert stats["catalog"]["total_products"] >= 0
|
||||
assert stats["inventory"]["total_quantity"] >= 0
|
||||
|
||||
def test_get_vendor_stats_vendor_not_found(self, db):
|
||||
"""Test vendor stats with non-existent vendor."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.get_vendor_stats(db, 99999)
|
||||
|
||||
def test_get_vendor_stats_with_inventory(
|
||||
self, db, test_vendor, test_product, test_inventory
|
||||
):
|
||||
"""Test vendor stats includes inventory data."""
|
||||
stats = self.service.get_vendor_stats(db, test_vendor.id)
|
||||
|
||||
assert stats["inventory"]["total_quantity"] >= test_inventory.quantity
|
||||
assert stats["inventory"]["reserved_quantity"] >= 0
|
||||
|
||||
def test_get_vendor_stats_database_error(self, db, test_vendor):
|
||||
"""Test vendor stats handles database errors after vendor check."""
|
||||
# Mock query to fail after first successful call (vendor check)
|
||||
original_query = db.query
|
||||
call_count = [0]
|
||||
|
||||
def mock_query(*args, **kwargs):
|
||||
call_count[0] += 1
|
||||
if call_count[0] > 1:
|
||||
raise SQLAlchemyError("DB Error")
|
||||
return original_query(*args, **kwargs)
|
||||
|
||||
with patch.object(db, "query", side_effect=mock_query):
|
||||
with pytest.raises(AdminOperationException) as exc_info:
|
||||
self.service.get_vendor_stats(db, test_vendor.id)
|
||||
|
||||
assert exc_info.value.details.get("operation") == "get_vendor_stats"
|
||||
|
||||
# ==================== get_vendor_analytics Tests ====================
|
||||
|
||||
def test_get_vendor_analytics_success(self, db, test_vendor):
|
||||
"""Test getting vendor analytics successfully."""
|
||||
analytics = self.service.get_vendor_analytics(db, test_vendor.id)
|
||||
|
||||
assert "period" in analytics
|
||||
assert "start_date" in analytics
|
||||
assert "imports" in analytics
|
||||
assert "catalog" in analytics
|
||||
assert "inventory" in analytics
|
||||
|
||||
def test_get_vendor_analytics_different_periods(self, db, test_vendor):
|
||||
"""Test vendor analytics with different time periods."""
|
||||
for period in ["7d", "30d", "90d", "1y"]:
|
||||
analytics = self.service.get_vendor_analytics(
|
||||
db, test_vendor.id, period=period
|
||||
)
|
||||
assert analytics["period"] == period
|
||||
|
||||
def test_get_vendor_analytics_vendor_not_found(self, db):
|
||||
"""Test vendor analytics with non-existent vendor."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.get_vendor_analytics(db, 99999)
|
||||
|
||||
# ==================== get_vendor_statistics Tests ====================
|
||||
|
||||
def test_get_vendor_statistics_success(self, db, test_vendor):
|
||||
"""Test getting vendor statistics for admin dashboard."""
|
||||
stats = self.service.get_vendor_statistics(db)
|
||||
|
||||
assert "total_vendors" in stats
|
||||
assert "active_vendors" in stats
|
||||
assert "inactive_vendors" in stats
|
||||
assert "verified_vendors" in stats
|
||||
assert "verification_rate" in stats
|
||||
|
||||
assert stats["total_vendors"] >= 1
|
||||
assert stats["active_vendors"] >= 1
|
||||
|
||||
def test_get_vendor_statistics_calculates_rates(self, db, test_vendor):
|
||||
"""Test vendor statistics calculates rates correctly."""
|
||||
stats = self.service.get_vendor_statistics(db)
|
||||
|
||||
if stats["total_vendors"] > 0:
|
||||
expected_rate = (
|
||||
stats["verified_vendors"] / stats["total_vendors"] * 100
|
||||
)
|
||||
assert abs(stats["verification_rate"] - expected_rate) < 0.01
|
||||
|
||||
def test_get_vendor_statistics_database_error(self, db):
|
||||
"""Test vendor statistics handles database errors."""
|
||||
with patch.object(db, "query", side_effect=SQLAlchemyError("DB Error")):
|
||||
with pytest.raises(AdminOperationException) as exc_info:
|
||||
self.service.get_vendor_statistics(db)
|
||||
|
||||
assert exc_info.value.details.get("operation") == "get_vendor_statistics"
|
||||
|
||||
# ==================== get_user_statistics Tests ====================
|
||||
|
||||
def test_get_user_statistics_success(self, db, test_user):
|
||||
"""Test getting user statistics."""
|
||||
stats = self.service.get_user_statistics(db)
|
||||
|
||||
assert "total_users" in stats
|
||||
assert "active_users" in stats
|
||||
assert "inactive_users" in stats
|
||||
assert "admin_users" in stats
|
||||
assert "activation_rate" in stats
|
||||
|
||||
assert stats["total_users"] >= 1
|
||||
|
||||
def test_get_user_statistics_calculates_correctly(self, db, test_user, test_admin):
|
||||
"""Test user statistics calculates values correctly."""
|
||||
stats = self.service.get_user_statistics(db)
|
||||
|
||||
assert stats["total_users"] == stats["active_users"] + stats["inactive_users"]
|
||||
assert stats["admin_users"] >= 1 # test_admin
|
||||
|
||||
def test_get_user_statistics_database_error(self, db):
|
||||
"""Test user statistics handles database errors."""
|
||||
with patch.object(db, "query", side_effect=SQLAlchemyError("DB Error")):
|
||||
with pytest.raises(AdminOperationException) as exc_info:
|
||||
self.service.get_user_statistics(db)
|
||||
|
||||
assert exc_info.value.details.get("operation") == "get_user_statistics"
|
||||
|
||||
# ==================== get_import_statistics Tests ====================
|
||||
|
||||
def test_get_import_statistics_success(self, db):
|
||||
"""Test getting import statistics."""
|
||||
stats = self.service.get_import_statistics(db)
|
||||
|
||||
assert "total_imports" in stats
|
||||
assert "completed_imports" in stats
|
||||
assert "failed_imports" in stats
|
||||
assert "success_rate" in stats
|
||||
|
||||
def test_get_import_statistics_with_jobs(
|
||||
self, db, test_vendor, test_marketplace_import_job
|
||||
):
|
||||
"""Test import statistics with existing jobs."""
|
||||
stats = self.service.get_import_statistics(db)
|
||||
|
||||
assert stats["total_imports"] >= 1
|
||||
assert stats["completed_imports"] >= 1 # test job has completed status
|
||||
|
||||
def test_get_import_statistics_calculates_rate(self, db):
|
||||
"""Test import statistics calculates success rate."""
|
||||
stats = self.service.get_import_statistics(db)
|
||||
|
||||
if stats["total_imports"] > 0:
|
||||
expected_rate = (
|
||||
stats["completed_imports"] / stats["total_imports"] * 100
|
||||
)
|
||||
assert abs(stats["success_rate"] - expected_rate) < 0.01
|
||||
else:
|
||||
assert stats["success_rate"] == 0
|
||||
|
||||
def test_get_import_statistics_handles_errors(self, db):
|
||||
"""Test import statistics returns zeros on error."""
|
||||
with patch.object(db, "query", side_effect=SQLAlchemyError("DB Error")):
|
||||
stats = self.service.get_import_statistics(db)
|
||||
|
||||
# Should return default values, not raise exception
|
||||
assert stats["total_imports"] == 0
|
||||
assert stats["completed_imports"] == 0
|
||||
assert stats["failed_imports"] == 0
|
||||
assert stats["success_rate"] == 0
|
||||
|
||||
# ==================== Private Helper Method Tests ====================
|
||||
|
||||
def test_parse_period_known_values(self):
|
||||
"""Test period parsing for known values."""
|
||||
assert self.service._parse_period("7d") == 7
|
||||
assert self.service._parse_period("30d") == 30
|
||||
assert self.service._parse_period("90d") == 90
|
||||
assert self.service._parse_period("1y") == 365
|
||||
|
||||
def test_parse_period_unknown_defaults(self):
|
||||
"""Test period parsing defaults to 30 for unknown values."""
|
||||
assert self.service._parse_period("unknown") == 30
|
||||
assert self.service._parse_period("") == 30
|
||||
|
||||
def test_get_unique_brands_count(self, db, test_marketplace_product):
|
||||
"""Test getting unique brands count"""
|
||||
# Add products with different brands
|
||||
"""Test getting unique brands count."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
brand_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="BRAND001",
|
||||
marketplace_product_id=f"BRAND001_{unique_id}",
|
||||
title="Brand MarketplaceProduct 1",
|
||||
brand="BrandA",
|
||||
marketplace="Test",
|
||||
@@ -241,7 +464,7 @@ class TestStatsService:
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="BRAND002",
|
||||
marketplace_product_id=f"BRAND002_{unique_id}",
|
||||
title="Brand MarketplaceProduct 2",
|
||||
brand="BrandB",
|
||||
marketplace="Test",
|
||||
@@ -255,17 +478,15 @@ class TestStatsService:
|
||||
|
||||
count = self.service._get_unique_brands_count(db)
|
||||
|
||||
assert (
|
||||
count >= 2
|
||||
) # At least BrandA and BrandB, plus possibly test_marketplace_product brand
|
||||
assert count >= 2 # At least BrandA and BrandB
|
||||
assert isinstance(count, int)
|
||||
|
||||
def test_get_unique_categories_count(self, db, test_marketplace_product):
|
||||
"""Test getting unique categories count"""
|
||||
# Add products with different categories
|
||||
"""Test getting unique categories count."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
category_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="CAT001",
|
||||
marketplace_product_id=f"CAT001_{unique_id}",
|
||||
title="Category MarketplaceProduct 1",
|
||||
google_product_category="Electronics",
|
||||
marketplace="Test",
|
||||
@@ -274,7 +495,7 @@ class TestStatsService:
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="CAT002",
|
||||
marketplace_product_id=f"CAT002_{unique_id}",
|
||||
title="Category MarketplaceProduct 2",
|
||||
google_product_category="Books",
|
||||
marketplace="Test",
|
||||
@@ -291,230 +512,35 @@ class TestStatsService:
|
||||
assert count >= 2 # At least Electronics and Books
|
||||
assert isinstance(count, int)
|
||||
|
||||
def test_get_unique_marketplaces_count(self, db, test_marketplace_product):
|
||||
"""Test getting unique marketplaces count"""
|
||||
# Add products with different marketplaces
|
||||
marketplace_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="MARKET001",
|
||||
title="Marketplace MarketplaceProduct 1",
|
||||
marketplace="Amazon",
|
||||
vendor_name="AmazonVendor",
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="MARKET002",
|
||||
title="Marketplace MarketplaceProduct 2",
|
||||
marketplace="eBay",
|
||||
vendor_name="eBayVendor",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(marketplace_products)
|
||||
db.commit()
|
||||
|
||||
count = self.service._get_unique_marketplaces_count(db)
|
||||
|
||||
assert (
|
||||
count >= 2
|
||||
) # At least Amazon and eBay, plus test_marketplace_product marketplace
|
||||
assert isinstance(count, int)
|
||||
|
||||
def test_get_unique_vendors_count(self, db, test_marketplace_product):
|
||||
"""Test getting unique vendors count"""
|
||||
# Add products with different vendor names
|
||||
products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="PRODUCT001",
|
||||
title="Vendor MarketplaceProduct 1",
|
||||
marketplace="Test",
|
||||
vendor_name="VendorA",
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="PRODUCT002",
|
||||
title="Vendor MarketplaceProduct 2",
|
||||
marketplace="Test",
|
||||
vendor_name="VendorB",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(products)
|
||||
db.commit()
|
||||
|
||||
count = self.service._get_unique_vendors_count(db)
|
||||
|
||||
assert (
|
||||
count >= 2
|
||||
) # At least VendorA and VendorB, plus test_marketplace_product vendor
|
||||
assert isinstance(count, int)
|
||||
|
||||
def test_get_inventory_statistics(self, db, test_inventory):
|
||||
"""Test getting inventory statistics"""
|
||||
# Add additional inventory entries
|
||||
additional_inventory = [
|
||||
Inventory(
|
||||
gtin="1234567890124",
|
||||
location="LOCATION2",
|
||||
quantity=25,
|
||||
reserved_quantity=5,
|
||||
vendor_id=test_inventory.vendor_id,
|
||||
),
|
||||
Inventory(
|
||||
gtin="1234567890125",
|
||||
location="LOCATION3",
|
||||
quantity=0, # Out of inventory
|
||||
reserved_quantity=0,
|
||||
vendor_id=test_inventory.vendor_id,
|
||||
),
|
||||
]
|
||||
db.add_all(additional_inventory)
|
||||
db.commit()
|
||||
|
||||
stats = self.service.get_inventory_statistics(db)
|
||||
|
||||
assert "total_inventory_entries" in stats
|
||||
assert "total_inventory_quantity" in stats
|
||||
assert stats["total_inventory_entries"] >= 3 # test_inventory + 2 additional
|
||||
assert stats["total_inventory_quantity"] >= 35 # 10 + 25 + 0 = 35
|
||||
|
||||
def test_get_brands_by_marketplace(self, db):
|
||||
"""Test getting brands for a specific marketplace"""
|
||||
# Create products for specific marketplace
|
||||
marketplace_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="SPECIFIC001",
|
||||
title="Specific MarketplaceProduct 1",
|
||||
brand="SpecificBrand1",
|
||||
marketplace="SpecificMarket",
|
||||
vendor_name="SpecificVendor1",
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="SPECIFIC002",
|
||||
title="Specific MarketplaceProduct 2",
|
||||
brand="SpecificBrand2",
|
||||
marketplace="SpecificMarket",
|
||||
vendor_name="SpecificVendor2",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="OTHER001",
|
||||
title="Other MarketplaceProduct",
|
||||
brand="OtherBrand",
|
||||
marketplace="OtherMarket",
|
||||
vendor_name="OtherVendor",
|
||||
price="20.00",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(marketplace_products)
|
||||
db.commit()
|
||||
|
||||
brands = self.service._get_brands_by_marketplace(db, "SpecificMarket")
|
||||
|
||||
assert len(brands) == 2
|
||||
assert "SpecificBrand1" in brands
|
||||
assert "SpecificBrand2" in brands
|
||||
assert "OtherBrand" not in brands
|
||||
|
||||
def test_get_vendors_by_marketplace(self, db):
|
||||
"""Test getting vendors for a specific marketplace"""
|
||||
# Create products for specific marketplace
|
||||
marketplace_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="MARKETTEST001",
|
||||
title="Vendor Test MarketplaceProduct 1",
|
||||
brand="TestBrand",
|
||||
marketplace="TestMarketplace",
|
||||
vendor_name="TestVendor1",
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="MARKETTEST002",
|
||||
title="Vendor Test MarketplaceProduct 2",
|
||||
brand="TestBrand",
|
||||
marketplace="TestMarketplace",
|
||||
vendor_name="TestVendor2",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(marketplace_products)
|
||||
db.commit()
|
||||
|
||||
vendors = self.service._get_vendors_by_marketplace(db, "TestMarketplace")
|
||||
|
||||
assert len(vendors) == 2
|
||||
assert "TestVendor1" in vendors
|
||||
assert "TestVendor2" in vendors
|
||||
|
||||
def test_get_products_by_marketplace(self, db):
|
||||
"""Test getting product count for a specific marketplace"""
|
||||
# Create products for specific marketplace
|
||||
marketplace_products = [
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="COUNT001",
|
||||
title="Count MarketplaceProduct 1",
|
||||
marketplace="CountMarketplace",
|
||||
vendor_name="CountVendor",
|
||||
price="10.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="COUNT002",
|
||||
title="Count MarketplaceProduct 2",
|
||||
marketplace="CountMarketplace",
|
||||
vendor_name="CountVendor",
|
||||
price="15.00",
|
||||
currency="EUR",
|
||||
),
|
||||
MarketplaceProduct(
|
||||
marketplace_product_id="COUNT003",
|
||||
title="Count MarketplaceProduct 3",
|
||||
marketplace="CountMarketplace",
|
||||
vendor_name="CountVendor",
|
||||
price="20.00",
|
||||
currency="EUR",
|
||||
),
|
||||
]
|
||||
db.add_all(marketplace_products)
|
||||
db.commit()
|
||||
|
||||
count = self.service._get_products_by_marketplace_count(db, "CountMarketplace")
|
||||
|
||||
assert count == 3
|
||||
|
||||
def test_get_products_by_marketplace_not_found(self, db):
|
||||
"""Test getting product count for non-existent marketplace"""
|
||||
count = self.service._get_products_by_marketplace_count(
|
||||
db, "NonExistentMarketplace"
|
||||
"""Test getting inventory statistics."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
additional_inventory = Inventory(
|
||||
gtin=f"123456789{unique_id[:4]}",
|
||||
location=f"LOCATION2_{unique_id}",
|
||||
quantity=25,
|
||||
reserved_quantity=5,
|
||||
vendor_id=test_inventory.vendor_id,
|
||||
product_id=test_inventory.product_id,
|
||||
)
|
||||
db.add(additional_inventory)
|
||||
db.commit()
|
||||
|
||||
assert count == 0
|
||||
stats = self.service._get_inventory_statistics(db)
|
||||
|
||||
def test_empty_database_stats(self, db):
|
||||
"""Test stats with empty database"""
|
||||
stats = self.service.get_comprehensive_stats(db)
|
||||
assert "total_entries" in stats
|
||||
assert "total_quantity" in stats
|
||||
assert "total_reserved" in stats
|
||||
assert "total_available" in stats
|
||||
|
||||
assert stats["total_products"] == 0
|
||||
assert stats["unique_brands"] == 0
|
||||
assert stats["unique_categories"] == 0
|
||||
assert stats["unique_marketplaces"] == 0
|
||||
assert stats["unique_vendors"] == 0
|
||||
assert stats["total_inventory_entries"] == 0
|
||||
assert stats["total_inventory_quantity"] == 0
|
||||
assert stats["total_entries"] >= 2
|
||||
assert stats["total_quantity"] >= test_inventory.quantity + 25
|
||||
|
||||
def test_marketplace_breakdown_empty_database(self, db):
|
||||
"""Test marketplace breakdown with empty database"""
|
||||
stats = self.service.get_marketplace_breakdown_stats(db)
|
||||
def test_get_inventory_statistics_empty(self, db):
|
||||
"""Test inventory statistics with empty database."""
|
||||
stats = self.service._get_inventory_statistics(db)
|
||||
|
||||
assert isinstance(stats, list)
|
||||
assert len(stats) == 0
|
||||
assert stats["total_entries"] == 0
|
||||
assert stats["total_quantity"] == 0
|
||||
assert stats["total_reserved"] == 0
|
||||
assert stats["total_available"] == 0
|
||||
|
||||
@@ -1,10 +1,12 @@
|
||||
# tests/test_vendor_service.py (updated to use custom exceptions)
|
||||
# tests/unit/services/test_vendor_service.py
|
||||
"""Unit tests for VendorService following the application's exception patterns."""
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from app.exceptions import (
|
||||
InvalidVendorDataException,
|
||||
MarketplaceProductNotFoundException,
|
||||
MaxVendorsReachedException,
|
||||
ProductAlreadyExistsException,
|
||||
UnauthorizedVendorAccessException,
|
||||
ValidationException,
|
||||
@@ -12,48 +14,84 @@ from app.exceptions import (
|
||||
VendorNotFoundException,
|
||||
)
|
||||
from app.services.vendor_service import VendorService
|
||||
from models.database.company import Company
|
||||
from models.database.vendor import Vendor
|
||||
from models.schema.product import ProductCreate
|
||||
from models.schema.vendor import VendorCreate
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def admin_company(db, test_admin):
|
||||
"""Create a test company for admin."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
company = Company(
|
||||
name=f"Admin Company {unique_id}",
|
||||
owner_user_id=test_admin.id,
|
||||
contact_email=f"admin{unique_id}@company.com",
|
||||
is_active=True,
|
||||
is_verified=True,
|
||||
)
|
||||
db.add(company)
|
||||
db.commit()
|
||||
db.refresh(company)
|
||||
return company
|
||||
|
||||
|
||||
# Note: other_company fixture is defined in tests/fixtures/vendor_fixtures.py
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.vendors
|
||||
class TestVendorService:
|
||||
"""Test suite for VendorService following the application's exception patterns"""
|
||||
"""Test suite for VendorService following the application's exception patterns."""
|
||||
|
||||
def setup_method(self):
|
||||
"""Setup method following the same pattern as admin service tests"""
|
||||
"""Setup method following the same pattern as admin service tests."""
|
||||
self.service = VendorService()
|
||||
|
||||
def test_create_vendor_success(self, db, test_user, vendor_factory):
|
||||
"""Test successful vendor creation"""
|
||||
# ==================== create_vendor Tests ====================
|
||||
|
||||
def test_create_vendor_success(self, db, test_user, test_company):
|
||||
"""Test successful vendor creation."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
vendor_data = VendorCreate(
|
||||
vendor_code="NEWVENDOR",
|
||||
vendor_name="New Test Vendor",
|
||||
description="A new test vendor ",
|
||||
company_id=test_company.id,
|
||||
vendor_code=f"NEWVENDOR_{unique_id}",
|
||||
subdomain=f"newvendor{unique_id.lower()}",
|
||||
name=f"New Test Vendor {unique_id}",
|
||||
description="A new test vendor",
|
||||
)
|
||||
|
||||
vendor = self.service.create_vendor(db, vendor_data, test_user)
|
||||
db.commit()
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.vendor_code == "NEWVENDOR"
|
||||
assert vendor.owner_user_id == test_user.id
|
||||
assert vendor.vendor_code == f"NEWVENDOR_{unique_id}".upper()
|
||||
assert vendor.company_id == test_company.id
|
||||
assert vendor.is_verified is False # Regular user creates unverified vendor
|
||||
|
||||
def test_create_vendor_admin_auto_verify(self, db, test_admin, vendor_factory):
|
||||
"""Test admin creates verified vendor automatically"""
|
||||
def test_create_vendor_admin_auto_verify(self, db, test_admin, admin_company):
|
||||
"""Test admin creates verified vendor automatically."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
vendor_data = VendorCreate(
|
||||
vendor_code="ADMINVENDOR", vendor_name="Admin Test Vendor"
|
||||
company_id=admin_company.id,
|
||||
vendor_code=f"ADMINVENDOR_{unique_id}",
|
||||
subdomain=f"adminvendor{unique_id.lower()}",
|
||||
name=f"Admin Test Vendor {unique_id}",
|
||||
)
|
||||
|
||||
vendor = self.service.create_vendor(db, vendor_data, test_admin)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_verified is True # Admin creates verified vendor
|
||||
|
||||
def test_create_vendor_duplicate_code(self, db, test_user, test_vendor):
|
||||
"""Test vendor creation fails with duplicate vendor code"""
|
||||
def test_create_vendor_duplicate_code(self, db, test_user, test_company, test_vendor):
|
||||
"""Test vendor creation fails with duplicate vendor code."""
|
||||
vendor_data = VendorCreate(
|
||||
vendor_code=test_vendor.vendor_code, vendor_name=test_vendor.name
|
||||
company_id=test_company.id,
|
||||
vendor_code=test_vendor.vendor_code,
|
||||
subdomain="duplicatesub",
|
||||
name="Duplicate Name",
|
||||
)
|
||||
|
||||
with pytest.raises(VendorAlreadyExistsException) as exc_info:
|
||||
@@ -63,11 +101,44 @@ class TestVendorService:
|
||||
assert exception.status_code == 409
|
||||
assert exception.error_code == "VENDOR_ALREADY_EXISTS"
|
||||
assert test_vendor.vendor_code.upper() in exception.message
|
||||
assert "vendor_code" in exception.details
|
||||
|
||||
def test_create_vendor_invalid_data_empty_code(self, db, test_user):
|
||||
"""Test vendor creation fails with empty vendor code"""
|
||||
vendor_data = VendorCreate(vendor_code="", vendor_name="Test Vendor")
|
||||
def test_create_vendor_missing_company_id(self, db, test_user):
|
||||
"""Test vendor creation fails without company_id."""
|
||||
# VendorCreate requires company_id, so this should raise ValidationError
|
||||
# from Pydantic before reaching service
|
||||
with pytest.raises(Exception): # Pydantic ValidationError
|
||||
VendorCreate(
|
||||
vendor_code="NOCOMPANY",
|
||||
subdomain="nocompany",
|
||||
name="No Company Vendor",
|
||||
)
|
||||
|
||||
def test_create_vendor_unauthorized_user(self, db, test_user, other_company):
|
||||
"""Test vendor creation fails when user doesn't own company."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
vendor_data = VendorCreate(
|
||||
company_id=other_company.id, # Not owned by test_user
|
||||
vendor_code=f"UNAUTH_{unique_id}",
|
||||
subdomain=f"unauth{unique_id.lower()}",
|
||||
name=f"Unauthorized Vendor {unique_id}",
|
||||
)
|
||||
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 403
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
|
||||
def test_create_vendor_invalid_company_id(self, db, test_user):
|
||||
"""Test vendor creation fails with non-existent company."""
|
||||
unique_id = str(uuid.uuid4())[:8]
|
||||
vendor_data = VendorCreate(
|
||||
company_id=99999, # Non-existent company
|
||||
vendor_code=f"BADCOMPANY_{unique_id}",
|
||||
subdomain=f"badcompany{unique_id.lower()}",
|
||||
name=f"Bad Company Vendor {unique_id}",
|
||||
)
|
||||
|
||||
with pytest.raises(InvalidVendorDataException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
@@ -75,69 +146,25 @@ class TestVendorService:
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 422
|
||||
assert exception.error_code == "INVALID_VENDOR_DATA"
|
||||
assert exception.details["field"] == "vendor_code"
|
||||
assert "company_id" in exception.details.get("field", "")
|
||||
|
||||
def test_create_vendor_invalid_data_empty_name(self, db, test_user):
|
||||
"""Test vendor creation fails with empty vendor name"""
|
||||
vendor_data = VendorCreate(vendor_code="VALIDCODE", vendor_name="")
|
||||
|
||||
with pytest.raises(InvalidVendorDataException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "INVALID_VENDOR_DATA"
|
||||
assert exception.details["field"] == "name"
|
||||
|
||||
def test_create_vendor_invalid_code_format(self, db, test_user):
|
||||
"""Test vendor creation fails with invalid vendor code format"""
|
||||
vendor_data = VendorCreate(
|
||||
vendor_code="INVALID@CODE!", vendor_name="Test Vendor"
|
||||
)
|
||||
|
||||
with pytest.raises(InvalidVendorDataException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "INVALID_VENDOR_DATA"
|
||||
assert exception.details["field"] == "vendor_code"
|
||||
assert "letters, numbers, underscores, and hyphens" in exception.message
|
||||
|
||||
def test_create_vendor_max_vendors_reached(self, db, test_user, monkeypatch):
|
||||
"""Test vendor creation fails when user reaches maximum vendors"""
|
||||
|
||||
# Mock the vendor count check to simulate user at limit
|
||||
def mock_check_vendor_limit(self, db, user):
|
||||
raise MaxVendorsReachedException(max_vendors=5, user_id=user.id)
|
||||
|
||||
monkeypatch.setattr(
|
||||
VendorService, "_check_vendor_limit", mock_check_vendor_limit
|
||||
)
|
||||
|
||||
vendor_data = VendorCreate(vendor_code="NEWVENDOR", vendor_name="New Vendor")
|
||||
|
||||
with pytest.raises(MaxVendorsReachedException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 400
|
||||
assert exception.error_code == "MAX_VENDORS_REACHED"
|
||||
assert exception.details["max_vendors"] == 5
|
||||
assert exception.details["user_id"] == test_user.id
|
||||
# ==================== get_vendors Tests ====================
|
||||
|
||||
def test_get_vendors_regular_user(
|
||||
self, db, test_user, test_vendor, inactive_vendor
|
||||
):
|
||||
"""Test regular user can only see active verified vendors and own vendors"""
|
||||
vendors, total = self.service.get_vendors(db, test_user, skip=0, limit=10)
|
||||
"""Test regular user can only see active verified vendors and own vendors."""
|
||||
vendors, total = self.service.get_vendors(db, test_user, skip=0, limit=100)
|
||||
|
||||
vendor_codes = [vendor.vendor_code for vendor in vendors]
|
||||
assert test_vendor.vendor_code in vendor_codes
|
||||
# Inactive vendor should not be visible to regular user
|
||||
assert inactive_vendor.vendor_code not in vendor_codes
|
||||
|
||||
def test_get_vendors_admin_user(
|
||||
self, db, test_admin, test_vendor, inactive_vendor, verified_vendor
|
||||
):
|
||||
"""Test admin user can see all vendors with filters"""
|
||||
"""Test admin user can see all vendors with filters."""
|
||||
vendors, total = self.service.get_vendors(
|
||||
db, test_admin, active_only=False, verified_only=False
|
||||
)
|
||||
@@ -147,149 +174,16 @@ class TestVendorService:
|
||||
assert inactive_vendor.vendor_code in vendor_codes
|
||||
assert verified_vendor.vendor_code in vendor_codes
|
||||
|
||||
def test_get_vendor_by_code_owner_access(self, db, test_user, test_vendor):
|
||||
"""Test vendor owner can access their own vendor"""
|
||||
vendor = self.service.get_vendor_by_code(
|
||||
db, test_vendor.vendor_code.lower(), test_user
|
||||
def test_get_vendors_pagination(self, db, test_admin):
|
||||
"""Test vendor pagination."""
|
||||
vendors, total = self.service.get_vendors(
|
||||
db, test_admin, skip=0, limit=5, active_only=False
|
||||
)
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
|
||||
def test_get_vendor_by_code_admin_access(self, db, test_admin, test_vendor):
|
||||
"""Test admin can access any vendor"""
|
||||
vendor = self.service.get_vendor_by_code(
|
||||
db, test_vendor.vendor_code.lower(), test_admin
|
||||
)
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
|
||||
def test_get_vendor_by_code_not_found(self, db, test_user):
|
||||
"""Test vendor not found raises proper exception"""
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, "NONEXISTENT", test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
assert exception.details["resource_type"] == "Vendor"
|
||||
assert exception.details["identifier"] == "NONEXISTENT"
|
||||
|
||||
def test_get_vendor_by_code_access_denied(self, db, test_user, inactive_vendor):
|
||||
"""Test regular user cannot access unverified vendor they don't own"""
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, inactive_vendor.vendor_code, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 403
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
assert exception.details["vendor_code"] == inactive_vendor.vendor_code
|
||||
assert exception.details["user_id"] == test_user.id
|
||||
|
||||
def test_add_product_to_vendor_success(self, db, test_vendor, unique_product):
|
||||
"""Test successfully adding product to vendor"""
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=unique_product.marketplace_product_id,
|
||||
price="15.99",
|
||||
is_featured=True,
|
||||
)
|
||||
|
||||
product = self.service.add_product_to_catalog(db, test_vendor, product_data)
|
||||
|
||||
assert product is not None
|
||||
assert product.vendor_id == test_vendor.id
|
||||
assert product.marketplace_product_id == unique_product.id
|
||||
|
||||
def test_add_product_to_vendor_product_not_found(self, db, test_vendor):
|
||||
"""Test adding non-existent product to vendor fails"""
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id="NONEXISTENT", price="15.99"
|
||||
)
|
||||
|
||||
with pytest.raises(MarketplaceProductNotFoundException) as exc_info:
|
||||
self.service.add_product_to_catalog(db, test_vendor, product_data)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "PRODUCT_NOT_FOUND"
|
||||
assert exception.details["resource_type"] == "MarketplaceProduct"
|
||||
assert exception.details["identifier"] == "NONEXISTENT"
|
||||
|
||||
def test_add_product_to_vendor_already_exists(self, db, test_vendor, test_product):
|
||||
"""Test adding product that's already in vendor fails"""
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=test_product.marketplace_product.marketplace_product_id,
|
||||
price="15.99",
|
||||
)
|
||||
|
||||
with pytest.raises(ProductAlreadyExistsException) as exc_info:
|
||||
self.service.add_product_to_catalog(db, test_vendor, product_data)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 409
|
||||
assert exception.error_code == "PRODUCT_ALREADY_EXISTS"
|
||||
assert exception.details["vendor_code"] == test_vendor.vendor_code
|
||||
assert (
|
||||
exception.details["marketplace_product_id"]
|
||||
== test_product.marketplace_product.marketplace_product_id
|
||||
)
|
||||
|
||||
def test_get_products_owner_access(self, db, test_user, test_vendor, test_product):
|
||||
"""Test vendor owner can get vendor products"""
|
||||
products, total = self.service.get_products(db, test_vendor, test_user)
|
||||
|
||||
assert total >= 1
|
||||
assert len(products) >= 1
|
||||
product_ids = [p.marketplace_product_id for p in products]
|
||||
assert test_product.marketplace_product_id in product_ids
|
||||
|
||||
def test_get_products_access_denied(self, db, test_user, inactive_vendor):
|
||||
"""Test non-owner cannot access unverified vendor products"""
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.get_products(db, inactive_vendor, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 403
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
assert exception.details["vendor_code"] == inactive_vendor.vendor_code
|
||||
assert exception.details["user_id"] == test_user.id
|
||||
|
||||
def test_get_products_with_filters(self, db, test_user, test_vendor, test_product):
|
||||
"""Test getting vendor products with various filters"""
|
||||
# Test active only filter
|
||||
products, total = self.service.get_products(
|
||||
db, test_vendor, test_user, active_only=True
|
||||
)
|
||||
assert all(p.is_active for p in products)
|
||||
|
||||
# Test featured only filter
|
||||
products, total = self.service.get_products(
|
||||
db, test_vendor, test_user, featured_only=True
|
||||
)
|
||||
assert all(p.is_featured for p in products)
|
||||
|
||||
# Test exception handling for generic errors
|
||||
def test_create_vendor_database_error(self, db, test_user, monkeypatch):
|
||||
"""Test vendor creation handles database errors gracefully"""
|
||||
|
||||
def mock_commit():
|
||||
raise Exception("Database connection failed")
|
||||
|
||||
monkeypatch.setattr(db, "commit", mock_commit)
|
||||
|
||||
vendor_data = VendorCreate(vendor_code="NEWVENDOR", vendor_name="Test Vendor")
|
||||
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 422
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to create vendor " in exception.message
|
||||
assert len(vendors) <= 5
|
||||
|
||||
def test_get_vendors_database_error(self, db, test_user, monkeypatch):
|
||||
"""Test get vendors handles database errors gracefully"""
|
||||
"""Test get vendors handles database errors gracefully."""
|
||||
|
||||
def mock_query(*args):
|
||||
raise Exception("Database query failed")
|
||||
@@ -303,40 +197,285 @@ class TestVendorService:
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to retrieve vendors" in exception.message
|
||||
|
||||
def test_add_product_database_error(
|
||||
self, db, test_vendor, unique_product, monkeypatch
|
||||
):
|
||||
"""Test add product handles database errors gracefully"""
|
||||
# ==================== get_vendor_by_code Tests ====================
|
||||
|
||||
def mock_commit():
|
||||
raise Exception("Database commit failed")
|
||||
|
||||
monkeypatch.setattr(db, "commit", mock_commit)
|
||||
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=unique_product.marketplace_product_id, price="15.99"
|
||||
def test_get_vendor_by_code_owner_access(self, db, test_user, test_vendor):
|
||||
"""Test vendor owner can access their own vendor."""
|
||||
vendor = self.service.get_vendor_by_code(
|
||||
db, test_vendor.vendor_code.lower(), test_user
|
||||
)
|
||||
|
||||
with pytest.raises(ValidationException) as exc_info:
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
|
||||
def test_get_vendor_by_code_admin_access(self, db, test_admin, test_vendor):
|
||||
"""Test admin can access any vendor."""
|
||||
vendor = self.service.get_vendor_by_code(
|
||||
db, test_vendor.vendor_code.lower(), test_admin
|
||||
)
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
|
||||
def test_get_vendor_by_code_not_found(self, db, test_user):
|
||||
"""Test vendor not found raises proper exception."""
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, "NONEXISTENT", test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
|
||||
def test_get_vendor_by_code_access_denied(self, db, test_user, inactive_vendor):
|
||||
"""Test regular user cannot access unverified vendor they don't own."""
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, inactive_vendor.vendor_code, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 403
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
|
||||
# ==================== get_vendor_by_id Tests ====================
|
||||
|
||||
def test_get_vendor_by_id_success(self, db, test_vendor):
|
||||
"""Test getting vendor by ID."""
|
||||
vendor = self.service.get_vendor_by_id(db, test_vendor.id)
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
assert vendor.vendor_code == test_vendor.vendor_code
|
||||
|
||||
def test_get_vendor_by_id_not_found(self, db):
|
||||
"""Test getting non-existent vendor by ID."""
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.get_vendor_by_id(db, 99999)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
|
||||
# ==================== get_active_vendor_by_code Tests ====================
|
||||
|
||||
def test_get_active_vendor_by_code_success(self, db, test_vendor):
|
||||
"""Test getting active vendor by code (public access)."""
|
||||
vendor = self.service.get_active_vendor_by_code(db, test_vendor.vendor_code)
|
||||
|
||||
assert vendor is not None
|
||||
assert vendor.id == test_vendor.id
|
||||
assert vendor.is_active is True
|
||||
|
||||
def test_get_active_vendor_by_code_inactive(self, db, inactive_vendor):
|
||||
"""Test getting inactive vendor fails."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.get_active_vendor_by_code(db, inactive_vendor.vendor_code)
|
||||
|
||||
def test_get_active_vendor_by_code_not_found(self, db):
|
||||
"""Test getting non-existent vendor fails."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.get_active_vendor_by_code(db, "NONEXISTENT")
|
||||
|
||||
# ==================== toggle_verification Tests ====================
|
||||
|
||||
def test_toggle_verification_verify(self, db, inactive_vendor):
|
||||
"""Test toggling verification on."""
|
||||
original_verified = inactive_vendor.is_verified
|
||||
vendor, message = self.service.toggle_verification(db, inactive_vendor.id)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_verified != original_verified
|
||||
assert "verified" in message.lower()
|
||||
|
||||
def test_toggle_verification_unverify(self, db, verified_vendor):
|
||||
"""Test toggling verification off."""
|
||||
vendor, message = self.service.toggle_verification(db, verified_vendor.id)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_verified is False
|
||||
assert "unverified" in message.lower()
|
||||
|
||||
def test_toggle_verification_not_found(self, db):
|
||||
"""Test toggle verification on non-existent vendor."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.toggle_verification(db, 99999)
|
||||
|
||||
# ==================== toggle_status Tests ====================
|
||||
|
||||
def test_toggle_status_deactivate(self, db, test_vendor):
|
||||
"""Test toggling active status off."""
|
||||
vendor, message = self.service.toggle_status(db, test_vendor.id)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_active is False
|
||||
assert "inactive" in message.lower()
|
||||
|
||||
def test_toggle_status_activate(self, db, inactive_vendor):
|
||||
"""Test toggling active status on."""
|
||||
vendor, message = self.service.toggle_status(db, inactive_vendor.id)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_active is True
|
||||
assert "active" in message.lower()
|
||||
|
||||
def test_toggle_status_not_found(self, db):
|
||||
"""Test toggle status on non-existent vendor."""
|
||||
with pytest.raises(VendorNotFoundException):
|
||||
self.service.toggle_status(db, 99999)
|
||||
|
||||
# ==================== set_verification / set_status Tests ====================
|
||||
|
||||
def test_set_verification_to_true(self, db, inactive_vendor):
|
||||
"""Test setting verification to true."""
|
||||
vendor, message = self.service.set_verification(db, inactive_vendor.id, True)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_verified is True
|
||||
|
||||
def test_set_verification_to_false(self, db, verified_vendor):
|
||||
"""Test setting verification to false."""
|
||||
vendor, message = self.service.set_verification(db, verified_vendor.id, False)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_verified is False
|
||||
|
||||
def test_set_status_to_active(self, db, inactive_vendor):
|
||||
"""Test setting status to active."""
|
||||
vendor, message = self.service.set_status(db, inactive_vendor.id, True)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_active is True
|
||||
|
||||
def test_set_status_to_inactive(self, db, test_vendor):
|
||||
"""Test setting status to inactive."""
|
||||
vendor, message = self.service.set_status(db, test_vendor.id, False)
|
||||
db.commit()
|
||||
|
||||
assert vendor.is_active is False
|
||||
|
||||
# ==================== add_product_to_catalog Tests ====================
|
||||
|
||||
def test_add_product_to_vendor_success(self, db, test_vendor, unique_product):
|
||||
"""Test successfully adding product to vendor."""
|
||||
from models.database.marketplace_product import MarketplaceProduct
|
||||
|
||||
# Re-query objects to avoid session issues
|
||||
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
mp = db.query(MarketplaceProduct).filter(
|
||||
MarketplaceProduct.id == unique_product.id
|
||||
).first()
|
||||
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=mp.id,
|
||||
price=15.99,
|
||||
is_featured=True,
|
||||
)
|
||||
|
||||
product = self.service.add_product_to_catalog(db, vendor, product_data)
|
||||
db.commit()
|
||||
|
||||
assert product is not None
|
||||
assert product.vendor_id == vendor.id
|
||||
assert product.marketplace_product_id == mp.id
|
||||
|
||||
def test_add_product_to_vendor_product_not_found(self, db, test_vendor):
|
||||
"""Test adding non-existent product to vendor fails."""
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=99999, # Non-existent ID
|
||||
price=15.99,
|
||||
)
|
||||
|
||||
with pytest.raises(MarketplaceProductNotFoundException) as exc_info:
|
||||
self.service.add_product_to_catalog(db, test_vendor, product_data)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.error_code == "VALIDATION_ERROR"
|
||||
assert "Failed to add product to vendor " in exception.message
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "PRODUCT_NOT_FOUND"
|
||||
|
||||
def test_add_product_to_vendor_already_exists(self, db, test_vendor, test_product):
|
||||
"""Test adding product that's already in vendor fails."""
|
||||
# Re-query to get fresh instances
|
||||
from models.database.product import Product
|
||||
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
product = db.query(Product).filter(Product.id == test_product.id).first()
|
||||
|
||||
product_data = ProductCreate(
|
||||
marketplace_product_id=product.marketplace_product_id,
|
||||
price=15.99,
|
||||
)
|
||||
|
||||
with pytest.raises(ProductAlreadyExistsException) as exc_info:
|
||||
self.service.add_product_to_catalog(db, vendor, product_data)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 409
|
||||
assert exception.error_code == "PRODUCT_ALREADY_EXISTS"
|
||||
|
||||
# ==================== get_products Tests ====================
|
||||
|
||||
def test_get_products_owner_access(self, db, test_user, test_vendor, test_product):
|
||||
"""Test vendor owner can get vendor products."""
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
products, total = self.service.get_products(db, vendor, test_user)
|
||||
|
||||
assert total >= 1
|
||||
assert len(products) >= 1
|
||||
|
||||
def test_get_products_access_denied(self, db, test_user, inactive_vendor):
|
||||
"""Test non-owner cannot access unverified vendor products."""
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor = db.query(Vendor).filter(Vendor.id == inactive_vendor.id).first()
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.get_products(db, vendor, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 403
|
||||
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
|
||||
|
||||
def test_get_products_with_filters(self, db, test_user, test_vendor, test_product):
|
||||
"""Test getting vendor products with various filters."""
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
# Test active only filter
|
||||
products, total = self.service.get_products(
|
||||
db, vendor, test_user, active_only=True
|
||||
)
|
||||
assert all(p.is_active for p in products)
|
||||
|
||||
# ==================== Helper Method Tests ====================
|
||||
|
||||
def test_vendor_code_exists(self, db, test_vendor):
|
||||
"""Test _vendor_code_exists helper method."""
|
||||
assert self.service._vendor_code_exists(db, test_vendor.vendor_code) is True
|
||||
assert self.service._vendor_code_exists(db, "NONEXISTENT") is False
|
||||
|
||||
def test_can_access_vendor_admin(self, db, test_admin, test_vendor):
|
||||
"""Test admin can always access vendor."""
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
|
||||
assert self.service._can_access_vendor(vendor, test_admin) is True
|
||||
|
||||
def test_can_access_vendor_active_verified(self, db, test_user, verified_vendor):
|
||||
"""Test any user can access active verified vendor."""
|
||||
# Re-query vendor to get fresh instance
|
||||
vendor = db.query(Vendor).filter(Vendor.id == verified_vendor.id).first()
|
||||
assert self.service._can_access_vendor(vendor, test_user) is True
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.vendors
|
||||
class TestVendorServiceExceptionDetails:
|
||||
"""Additional tests focusing specifically on exception structure and details"""
|
||||
"""Additional tests focusing specifically on exception structure and details."""
|
||||
|
||||
def setup_method(self):
|
||||
self.service = VendorService()
|
||||
|
||||
def test_exception_to_dict_structure(self, db, test_user, test_vendor):
|
||||
"""Test that exceptions can be properly serialized to dict for API responses"""
|
||||
def test_exception_to_dict_structure(self, db, test_user, test_vendor, test_company):
|
||||
"""Test that exceptions can be properly serialized to dict for API responses."""
|
||||
vendor_data = VendorCreate(
|
||||
vendor_code=test_vendor.vendor_code, vendor_name="Duplicate"
|
||||
company_id=test_company.id,
|
||||
vendor_code=test_vendor.vendor_code,
|
||||
subdomain="duplicate",
|
||||
name="Duplicate",
|
||||
)
|
||||
|
||||
with pytest.raises(VendorAlreadyExistsException) as exc_info:
|
||||
@@ -356,20 +495,8 @@ class TestVendorServiceExceptionDetails:
|
||||
assert exception_dict["status_code"] == 409
|
||||
assert isinstance(exception_dict["details"], dict)
|
||||
|
||||
def test_validation_exception_field_details(self, db, test_user):
|
||||
"""Test validation exceptions include field-specific details"""
|
||||
vendor_data = VendorCreate(vendor_code="", vendor_name="Test")
|
||||
|
||||
with pytest.raises(InvalidVendorDataException) as exc_info:
|
||||
self.service.create_vendor(db, vendor_data, test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.details["field"] == "vendor_code"
|
||||
assert exception.status_code == 422
|
||||
assert "required" in exception.message.lower()
|
||||
|
||||
def test_authorization_exception_user_details(self, db, test_user, inactive_vendor):
|
||||
"""Test authorization exceptions include user context"""
|
||||
"""Test authorization exceptions include user context."""
|
||||
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, inactive_vendor.vendor_code, test_user)
|
||||
|
||||
@@ -377,3 +504,12 @@ class TestVendorServiceExceptionDetails:
|
||||
assert exception.details["vendor_code"] == inactive_vendor.vendor_code
|
||||
assert exception.details["user_id"] == test_user.id
|
||||
assert "Unauthorized access" in exception.message
|
||||
|
||||
def test_not_found_exception_details(self, db, test_user):
|
||||
"""Test not found exceptions include identifier details."""
|
||||
with pytest.raises(VendorNotFoundException) as exc_info:
|
||||
self.service.get_vendor_by_code(db, "NOTEXIST", test_user)
|
||||
|
||||
exception = exc_info.value
|
||||
assert exception.status_code == 404
|
||||
assert exception.error_code == "VENDOR_NOT_FOUND"
|
||||
|
||||
Reference in New Issue
Block a user