diff --git a/tests/unit/services/test_inventory_service.py b/tests/unit/services/test_inventory_service.py index be39f00f..28442a60 100644 --- a/tests/unit/services/test_inventory_service.py +++ b/tests/unit/services/test_inventory_service.py @@ -544,3 +544,243 @@ class TestInventoryService: with pytest.raises(InventoryNotFoundException): self.service.delete_inventory(db, other_vendor.id, test_inventory.id) + + # ==================== Admin Method Tests ==================== + + def test_get_all_inventory_admin_success(self, db, test_inventory): + """Test get_all_inventory_admin returns all inventory.""" + result = self.service.get_all_inventory_admin(db) + + assert result.total >= 1 + assert len(result.inventories) >= 1 + assert any(inv.id == test_inventory.id for inv in result.inventories) + + def test_get_all_inventory_admin_with_vendor_filter( + self, db, test_inventory, test_vendor + ): + """Test get_all_inventory_admin filters by vendor.""" + result = self.service.get_all_inventory_admin( + db, vendor_id=test_vendor.id + ) + + for inv in result.inventories: + assert inv.vendor_id == test_vendor.id + + def test_get_all_inventory_admin_with_location_filter( + self, db, test_inventory + ): + """Test get_all_inventory_admin filters by location.""" + location_prefix = test_inventory.location[:5] + result = self.service.get_all_inventory_admin( + db, location=location_prefix + ) + + for inv in result.inventories: + assert location_prefix.upper() in inv.location.upper() + + def test_get_all_inventory_admin_with_low_stock_filter(self, db): + """Test get_all_inventory_admin filters by low stock.""" + result = self.service.get_all_inventory_admin(db, low_stock=5) + + for inv in result.inventories: + assert inv.quantity <= 5 + + def test_get_all_inventory_admin_pagination(self, db): + """Test get_all_inventory_admin pagination.""" + result = self.service.get_all_inventory_admin(db, skip=0, limit=5) + + assert len(result.inventories) <= 5 + assert result.skip == 0 + assert result.limit == 5 + + def test_get_inventory_stats_admin(self, db, test_inventory): + """Test get_inventory_stats_admin returns stats.""" + result = self.service.get_inventory_stats_admin(db) + + assert result.total_entries >= 1 + assert result.total_quantity >= test_inventory.quantity + assert result.total_reserved >= 0 + assert result.total_available >= 0 + assert result.low_stock_count >= 0 + assert result.vendors_with_inventory >= 1 + assert result.unique_locations >= 1 + + def test_get_low_stock_items_admin(self, db, test_product, test_vendor): + """Test get_low_stock_items_admin returns low stock items.""" + # Create low stock inventory + unique_id = str(uuid.uuid4())[:8].upper() + low_stock_inv = Inventory( + product_id=test_product.id, + vendor_id=test_vendor.id, + warehouse="strassen", + bin_location=f"LOW_{unique_id}", + location=f"LOW_{unique_id}", + quantity=3, + reserved_quantity=0, + ) + db.add(low_stock_inv) + db.commit() + + result = self.service.get_low_stock_items_admin(db, threshold=10) + + assert len(result) >= 1 + for item in result: + assert item.quantity <= 10 + + def test_get_low_stock_items_admin_with_vendor_filter( + self, db, test_inventory, test_vendor + ): + """Test get_low_stock_items_admin filters by vendor.""" + result = self.service.get_low_stock_items_admin( + db, threshold=1000, vendor_id=test_vendor.id + ) + + for item in result: + assert item.vendor_id == test_vendor.id + + def test_get_vendors_with_inventory_admin(self, db, test_inventory, test_vendor): + """Test get_vendors_with_inventory_admin returns vendors list.""" + result = self.service.get_vendors_with_inventory_admin(db) + + assert len(result.vendors) >= 1 + assert any(v.id == test_vendor.id for v in result.vendors) + + def test_get_inventory_locations_admin(self, db, test_inventory): + """Test get_inventory_locations_admin returns locations.""" + result = self.service.get_inventory_locations_admin(db) + + assert len(result.locations) >= 1 + assert test_inventory.location in result.locations + + def test_get_inventory_locations_admin_with_vendor_filter( + self, db, test_inventory, test_vendor + ): + """Test get_inventory_locations_admin filters by vendor.""" + result = self.service.get_inventory_locations_admin( + db, vendor_id=test_vendor.id + ) + + assert len(result.locations) >= 1 + + def test_get_vendor_inventory_admin_success( + self, db, test_inventory, test_vendor + ): + """Test get_vendor_inventory_admin returns vendor inventory.""" + result = self.service.get_vendor_inventory_admin( + db, vendor_id=test_vendor.id + ) + + assert result.total >= 1 + assert result.vendor_filter == test_vendor.id + for inv in result.inventories: + assert inv.vendor_id == test_vendor.id + + def test_get_vendor_inventory_admin_vendor_not_found(self, db): + """Test get_vendor_inventory_admin raises for non-existent vendor.""" + from app.exceptions import VendorNotFoundException + + with pytest.raises(VendorNotFoundException): + self.service.get_vendor_inventory_admin(db, vendor_id=99999) + + def test_get_product_inventory_admin(self, db, test_inventory, test_product): + """Test get_product_inventory_admin returns product inventory.""" + result = self.service.get_product_inventory_admin(db, test_product.id) + + assert result.product_id == test_product.id + assert result.total_quantity >= test_inventory.quantity + + def test_get_product_inventory_admin_not_found(self, db): + """Test get_product_inventory_admin raises for non-existent product.""" + with pytest.raises(ProductNotFoundException): + self.service.get_product_inventory_admin(db, 99999) + + def test_verify_vendor_exists_success(self, db, test_vendor): + """Test verify_vendor_exists returns vendor.""" + result = self.service.verify_vendor_exists(db, test_vendor.id) + + assert result.id == test_vendor.id + + def test_verify_vendor_exists_not_found(self, db): + """Test verify_vendor_exists raises for non-existent vendor.""" + from app.exceptions import VendorNotFoundException + + with pytest.raises(VendorNotFoundException): + self.service.verify_vendor_exists(db, 99999) + + def test_get_inventory_by_id_admin_success(self, db, test_inventory): + """Test get_inventory_by_id_admin returns inventory.""" + result = self.service.get_inventory_by_id_admin(db, test_inventory.id) + + assert result.id == test_inventory.id + + def test_get_inventory_by_id_admin_not_found(self, db): + """Test get_inventory_by_id_admin raises for non-existent.""" + with pytest.raises(InventoryNotFoundException): + self.service.get_inventory_by_id_admin(db, 99999) + + # ==================== Private Helper Tests ==================== + + def test_get_vendor_product_success(self, db, test_product, test_vendor): + """Test _get_vendor_product returns product.""" + result = self.service._get_vendor_product( + db, test_vendor.id, test_product.id + ) + + assert result.id == test_product.id + + def test_get_vendor_product_not_found(self, db, test_vendor): + """Test _get_vendor_product raises for non-existent product.""" + with pytest.raises(ProductNotFoundException): + self.service._get_vendor_product(db, test_vendor.id, 99999) + + def test_get_vendor_product_wrong_vendor( + self, db, test_product, other_company + ): + """Test _get_vendor_product raises for wrong vendor.""" + from models.database.vendor import Vendor + + unique_id = str(uuid.uuid4())[:8] + other_vendor = Vendor( + company_id=other_company.id, + vendor_code=f"HELPER_{unique_id.upper()}", + subdomain=f"helper{unique_id.lower()}", + name=f"Helper Test Vendor {unique_id}", + is_active=True, + ) + db.add(other_vendor) + db.commit() + + with pytest.raises(ProductNotFoundException): + self.service._get_vendor_product( + db, other_vendor.id, test_product.id + ) + + def test_get_inventory_entry_returns_existing( + self, db, test_inventory, test_product + ): + """Test _get_inventory_entry returns existing entry.""" + result = self.service._get_inventory_entry( + db, test_product.id, test_inventory.location + ) + + assert result is not None + assert result.id == test_inventory.id + + def test_get_inventory_entry_returns_none(self, db, test_product): + """Test _get_inventory_entry returns None when not found.""" + result = self.service._get_inventory_entry( + db, test_product.id, "NONEXISTENT_LOCATION" + ) + + assert result is None + + def test_get_inventory_by_id_returns_existing(self, db, test_inventory): + """Test _get_inventory_by_id returns existing entry.""" + result = self.service._get_inventory_by_id(db, test_inventory.id) + + assert result.id == test_inventory.id + + def test_get_inventory_by_id_raises_not_found(self, db): + """Test _get_inventory_by_id raises when not found.""" + with pytest.raises(InventoryNotFoundException): + self.service._get_inventory_by_id(db, 99999) diff --git a/tests/unit/services/test_marketplace_product_service.py b/tests/unit/services/test_marketplace_product_service.py new file mode 100644 index 00000000..4d49404d --- /dev/null +++ b/tests/unit/services/test_marketplace_product_service.py @@ -0,0 +1,584 @@ +# tests/unit/services/test_marketplace_product_service.py +""" +Unit tests for MarketplaceProductService. + +Tests cover: +- Product creation with validation +- Product retrieval and filtering +- Product updates +- Product deletion +- Inventory information +- Admin methods +- CSV export +""" + +import uuid + +import pytest + +from app.exceptions import ( + InvalidMarketplaceProductDataException, + MarketplaceProductNotFoundException, + MarketplaceProductValidationException, + ValidationException, +) +from app.services.marketplace_product_service import ( + MarketplaceProductService, + marketplace_product_service, +) +from models.database.marketplace_product import MarketplaceProduct +from models.database.marketplace_product_translation import ( + MarketplaceProductTranslation, +) +from models.schema.marketplace_product import ( + MarketplaceProductCreate, + MarketplaceProductUpdate, +) + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceCreate: + """Test product creation functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_create_product_success(self, db): + """Test successful product creation""" + unique_id = str(uuid.uuid4())[:8] + product_data = MarketplaceProductCreate( + marketplace_product_id=f"MP-{unique_id}", + title="Test Product", + gtin="1234567890123", + price="19.99 EUR", + marketplace="Letzshop", + ) + + product = self.service.create_product( + db, product_data, title="Test Product", language="en" + ) + db.commit() + + assert product is not None + assert product.marketplace_product_id == f"MP-{unique_id}" + assert product.gtin == "1234567890123" + + def test_create_product_with_translation(self, db): + """Test product creation with translation""" + unique_id = str(uuid.uuid4())[:8] + product_data = MarketplaceProductCreate( + marketplace_product_id=f"MP-TRANS-{unique_id}", + title="Test Product Title", + marketplace="Letzshop", + ) + + product = self.service.create_product( + db, + product_data, + title="Test Product Title", + description="Test Description", + language="en", + ) + db.commit() + + # Check translation was created + translation = ( + db.query(MarketplaceProductTranslation) + .filter( + MarketplaceProductTranslation.marketplace_product_id == product.id, + MarketplaceProductTranslation.language == "en", + ) + .first() + ) + + assert translation is not None + assert translation.title == "Test Product Title" + assert translation.description == "Test Description" + + def test_create_product_invalid_gtin(self, db): + """Test product creation fails with invalid GTIN""" + unique_id = str(uuid.uuid4())[:8] + product_data = MarketplaceProductCreate( + marketplace_product_id=f"MP-{unique_id}", + title="Test Product", + gtin="invalid-gtin", + marketplace="Letzshop", + ) + + with pytest.raises(InvalidMarketplaceProductDataException): + self.service.create_product(db, product_data) + + def test_create_product_empty_id(self, db): + """Test product creation fails with empty ID""" + # Note: Pydantic won't allow empty marketplace_product_id, so test the service + # directly by creating a product and checking validation + unique_id = str(uuid.uuid4())[:8] + product_data = MarketplaceProductCreate( + marketplace_product_id=f" SPACE-{unique_id} ", + title="Test Product", + marketplace="Letzshop", + ) + + # The service should handle whitespace-only IDs + product = self.service.create_product(db, product_data) + db.commit() + # IDs with only spaces should be stripped to valid IDs + assert product is not None + + def test_create_product_default_marketplace(self, db): + """Test product creation uses default marketplace""" + unique_id = str(uuid.uuid4())[:8] + product_data = MarketplaceProductCreate( + marketplace_product_id=f"MP-DEFAULT-{unique_id}", + title="Test Product", + ) + + product = self.service.create_product(db, product_data) + db.commit() + + assert product.marketplace == "Letzshop" + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceRetrieval: + """Test product retrieval functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_get_product_by_id_success(self, db, test_marketplace_product): + """Test getting product by marketplace ID""" + product = self.service.get_product_by_id( + db, test_marketplace_product.marketplace_product_id + ) + + assert product is not None + assert product.id == test_marketplace_product.id + + def test_get_product_by_id_not_found(self, db): + """Test getting non-existent product returns None""" + product = self.service.get_product_by_id(db, "NONEXISTENT") + + assert product is None + + def test_get_product_by_id_or_raise_success(self, db, test_marketplace_product): + """Test get_product_by_id_or_raise returns product""" + product = self.service.get_product_by_id_or_raise( + db, test_marketplace_product.marketplace_product_id + ) + + assert product.id == test_marketplace_product.id + + def test_get_product_by_id_or_raise_not_found(self, db): + """Test get_product_by_id_or_raise raises exception""" + with pytest.raises(MarketplaceProductNotFoundException): + self.service.get_product_by_id_or_raise(db, "NONEXISTENT") + + def test_product_exists_true(self, db, test_marketplace_product): + """Test product_exists returns True when exists""" + result = self.service.product_exists( + db, test_marketplace_product.marketplace_product_id + ) + + assert result is True + + def test_product_exists_false(self, db): + """Test product_exists returns False when not exists""" + result = self.service.product_exists(db, "NONEXISTENT") + + assert result is False + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceFiltering: + """Test product filtering functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_get_products_with_filters_basic(self, db, test_marketplace_product): + """Test basic product retrieval with filters""" + products, total = self.service.get_products_with_filters(db) + + assert total >= 1 + assert len(products) >= 1 + + def test_get_products_with_brand_filter(self, db, test_marketplace_product): + """Test product retrieval with brand filter""" + # Set up a brand + test_marketplace_product.brand = "TestBrand" + db.commit() + + products, total = self.service.get_products_with_filters( + db, brand="TestBrand" + ) + + for product in products: + assert "testbrand" in (product.brand or "").lower() + + def test_get_products_with_marketplace_filter(self, db, test_marketplace_product): + """Test product retrieval with marketplace filter""" + products, total = self.service.get_products_with_filters( + db, marketplace=test_marketplace_product.marketplace + ) + + for product in products: + assert test_marketplace_product.marketplace.lower() in ( + product.marketplace or "" + ).lower() + + def test_get_products_with_search(self, db, test_marketplace_product): + """Test product retrieval with search""" + # Create translation with searchable title + translation = ( + db.query(MarketplaceProductTranslation) + .filter( + MarketplaceProductTranslation.marketplace_product_id + == test_marketplace_product.id + ) + .first() + ) + if translation: + translation.title = "Searchable Test Product" + db.commit() + + products, total = self.service.get_products_with_filters( + db, search="Searchable" + ) + + assert total >= 1 + + def test_get_products_with_pagination(self, db): + """Test product retrieval with pagination""" + products, total = self.service.get_products_with_filters( + db, skip=0, limit=5 + ) + + assert len(products) <= 5 + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceUpdate: + """Test product update functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_update_product_success(self, db, test_marketplace_product): + """Test successful product update""" + update_data = MarketplaceProductUpdate(brand="UpdatedBrand") + + updated = self.service.update_product( + db, + test_marketplace_product.marketplace_product_id, + update_data, + ) + db.commit() + + assert updated.brand == "UpdatedBrand" + + def test_update_product_with_translation(self, db, test_marketplace_product): + """Test product update with translation""" + update_data = MarketplaceProductUpdate() + + updated = self.service.update_product( + db, + test_marketplace_product.marketplace_product_id, + update_data, + title="Updated Title", + description="Updated Description", + language="en", + ) + db.commit() + + # Verify translation + title = updated.get_title("en") + assert title == "Updated Title" + + def test_update_product_not_found(self, db): + """Test update raises for non-existent product""" + update_data = MarketplaceProductUpdate(brand="NewBrand") + + with pytest.raises(MarketplaceProductNotFoundException): + self.service.update_product(db, "NONEXISTENT", update_data) + + def test_update_product_invalid_gtin(self, db, test_marketplace_product): + """Test update fails with invalid GTIN""" + update_data = MarketplaceProductUpdate(gtin="invalid") + + with pytest.raises(InvalidMarketplaceProductDataException): + self.service.update_product( + db, + test_marketplace_product.marketplace_product_id, + update_data, + ) + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceDelete: + """Test product deletion functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_delete_product_success(self, db): + """Test successful product deletion""" + # Create a product to delete + unique_id = str(uuid.uuid4())[:8] + product = MarketplaceProduct( + marketplace_product_id=f"DELETE-{unique_id}", + marketplace="Letzshop", + ) + db.add(product) + db.commit() + + result = self.service.delete_product(db, f"DELETE-{unique_id}") + db.commit() + + assert result is True + + # Verify deleted + deleted = self.service.get_product_by_id(db, f"DELETE-{unique_id}") + assert deleted is None + + def test_delete_product_not_found(self, db): + """Test delete raises for non-existent product""" + with pytest.raises(MarketplaceProductNotFoundException): + self.service.delete_product(db, "NONEXISTENT") + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceInventory: + """Test inventory functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_get_inventory_info_not_found(self, db): + """Test get_inventory_info returns None when not found""" + result = self.service.get_inventory_info(db, "NONEXISTENT_GTIN") + + assert result is None + + def test_get_inventory_info_with_inventory(self, db, test_inventory): + """Test get_inventory_info returns data when exists""" + gtin = test_inventory.gtin + if gtin: + result = self.service.get_inventory_info(db, gtin) + + if result: + assert result.gtin == gtin + assert result.total_quantity >= 0 + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceAdmin: + """Test admin functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_get_admin_products(self, db, test_marketplace_product): + """Test admin product listing""" + products, total = self.service.get_admin_products(db) + + assert total >= 1 + assert len(products) >= 1 + + def test_get_admin_products_with_search(self, db, test_marketplace_product): + """Test admin product listing with search""" + products, total = self.service.get_admin_products( + db, search=test_marketplace_product.marketplace_product_id[:5] + ) + + # Should find at least our test product + assert total >= 0 + + def test_get_admin_products_with_filters(self, db, test_marketplace_product): + """Test admin product listing with filters""" + products, total = self.service.get_admin_products( + db, + marketplace=test_marketplace_product.marketplace, + is_active=True, + ) + + for product in products: + assert product["is_active"] is True + + def test_get_admin_product_stats(self, db, test_marketplace_product): + """Test admin product statistics""" + stats = self.service.get_admin_product_stats(db) + + assert "total" in stats + assert "active" in stats + assert "inactive" in stats + assert "by_marketplace" in stats + assert stats["total"] >= 1 + + def test_get_admin_product_stats_with_filters(self, db, test_marketplace_product): + """Test admin product statistics with filters""" + stats = self.service.get_admin_product_stats( + db, marketplace=test_marketplace_product.marketplace + ) + + assert stats["total"] >= 0 + + def test_get_marketplaces_list(self, db, test_marketplace_product): + """Test getting unique marketplaces list""" + marketplaces = self.service.get_marketplaces_list(db) + + assert isinstance(marketplaces, list) + if test_marketplace_product.marketplace: + assert test_marketplace_product.marketplace in marketplaces + + def test_get_source_vendors_list(self, db, test_marketplace_product): + """Test getting unique vendor names list""" + vendors = self.service.get_source_vendors_list(db) + + assert isinstance(vendors, list) + + def test_get_admin_product_detail(self, db, test_marketplace_product): + """Test getting detailed product info for admin""" + detail = self.service.get_admin_product_detail(db, test_marketplace_product.id) + + assert detail["id"] == test_marketplace_product.id + assert detail["marketplace_product_id"] == test_marketplace_product.marketplace_product_id + assert "translations" in detail + + def test_get_admin_product_detail_not_found(self, db): + """Test admin product detail raises for non-existent""" + with pytest.raises(MarketplaceProductNotFoundException): + self.service.get_admin_product_detail(db, 99999) + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceCsvExport: + """Test CSV export functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_generate_csv_export_header(self, db): + """Test CSV export generates header""" + csv_generator = self.service.generate_csv_export(db) + header = next(csv_generator) + + assert "marketplace_product_id" in header + assert "title" in header + assert "price" in header + + def test_generate_csv_export_with_data(self, db, test_marketplace_product): + """Test CSV export generates data rows""" + rows = list(self.service.generate_csv_export(db)) + + # Should have header + at least one data row + assert len(rows) >= 1 + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceCopyToCatalog: + """Test copy to vendor catalog functionality""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_copy_to_vendor_catalog_success( + self, db, test_marketplace_product, test_vendor + ): + """Test copying products to vendor catalog""" + from unittest.mock import MagicMock, patch + + # Create a mock subscription + mock_subscription = MagicMock() + mock_subscription.products_limit = 100 + + with patch( + "app.services.subscription_service.subscription_service" + ) as mock_sub: + mock_sub.get_or_create_subscription.return_value = mock_subscription + + result = self.service.copy_to_vendor_catalog( + db, + [test_marketplace_product.id], + test_vendor.id, + ) + db.commit() + + assert "copied" in result + assert "skipped" in result + assert "failed" in result + + def test_copy_to_vendor_catalog_vendor_not_found(self, db, test_marketplace_product): + """Test copy fails for non-existent vendor""" + from app.exceptions import VendorNotFoundException + + with pytest.raises(VendorNotFoundException): + self.service.copy_to_vendor_catalog( + db, + [test_marketplace_product.id], + 99999, + ) + + def test_copy_to_vendor_catalog_no_products(self, db, test_vendor): + """Test copy fails when no products found""" + with pytest.raises(MarketplaceProductNotFoundException): + self.service.copy_to_vendor_catalog( + db, + [99999], # Non-existent product + test_vendor.id, + ) + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceHelpers: + """Test helper methods""" + + def setup_method(self): + self.service = MarketplaceProductService() + + def test_validate_product_data_missing_id(self): + """Test validation fails for missing marketplace_product_id""" + with pytest.raises(MarketplaceProductValidationException): + self.service._validate_product_data({}) + + def test_validate_product_data_success(self): + """Test validation passes with required fields""" + # Should not raise + self.service._validate_product_data( + {"marketplace_product_id": "TEST-123"} + ) + + def test_normalize_product_data(self): + """Test product data normalization""" + data = { + "marketplace_product_id": " TEST-123 ", + "brand": " TestBrand ", + "marketplace": " Letzshop ", + } + + normalized = self.service._normalize_product_data(data) + + assert normalized["marketplace_product_id"] == "TEST-123" + assert normalized["brand"] == "TestBrand" + assert normalized["marketplace"] == "Letzshop" + + +@pytest.mark.unit +@pytest.mark.service +class TestMarketplaceProductServiceSingleton: + """Test singleton instance""" + + def test_singleton_exists(self): + """Test marketplace_product_service singleton exists""" + assert marketplace_product_service is not None + assert isinstance(marketplace_product_service, MarketplaceProductService) diff --git a/tests/unit/services/test_order_service.py b/tests/unit/services/test_order_service.py new file mode 100644 index 00000000..470a16f1 --- /dev/null +++ b/tests/unit/services/test_order_service.py @@ -0,0 +1,590 @@ +# tests/unit/services/test_order_service.py +""" +Unit tests for OrderService. + +Tests cover: +- Order number generation +- Customer management +- Order creation (direct and Letzshop) +- Order retrieval and filtering +- Order updates and status management +- Admin operations +""" + +from datetime import UTC, datetime +from unittest.mock import MagicMock, patch + +import pytest + +from app.exceptions import ( + CustomerNotFoundException, + OrderNotFoundException, + ValidationException, +) +from app.services.order_service import ( + PLACEHOLDER_GTIN, + PLACEHOLDER_MARKETPLACE_ID, + OrderService, + order_service, +) +from models.database.customer import Customer +from models.database.order import Order, OrderItem + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceNumberGeneration: + """Test order number generation""" + + def test_generate_order_number_format(self, db, test_vendor): + """Test order number has correct format""" + service = OrderService() + order_number = service._generate_order_number(db, test_vendor.id) + + assert order_number.startswith("ORD-") + assert f"-{test_vendor.id}-" in order_number + parts = order_number.split("-") + assert len(parts) == 4 + + def test_generate_order_number_unique(self, db, test_vendor): + """Test order numbers are unique""" + service = OrderService() + numbers = set() + + for _ in range(10): + num = service._generate_order_number(db, test_vendor.id) + assert num not in numbers + numbers.add(num) + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceCustomerManagement: + """Test customer management""" + + def test_find_or_create_customer_creates_new(self, db, test_vendor): + """Test creating new customer""" + service = OrderService() + customer = service.find_or_create_customer( + db=db, + vendor_id=test_vendor.id, + email="newcustomer@example.com", + first_name="New", + last_name="Customer", + phone="+352123456789", + ) + db.commit() + + assert customer.id is not None + assert customer.email == "newcustomer@example.com" + assert customer.first_name == "New" + assert customer.last_name == "Customer" + assert customer.vendor_id == test_vendor.id + assert customer.is_active is False # Default inactive + + def test_find_or_create_customer_finds_existing(self, db, test_vendor): + """Test finding existing customer by email""" + service = OrderService() + + # Create customer first + customer1 = service.find_or_create_customer( + db=db, + vendor_id=test_vendor.id, + email="existing@example.com", + first_name="Existing", + last_name="Customer", + ) + db.commit() + + # Try to create again with same email + customer2 = service.find_or_create_customer( + db=db, + vendor_id=test_vendor.id, + email="existing@example.com", + first_name="Different", + last_name="Name", + ) + + assert customer1.id == customer2.id + + def test_find_or_create_customer_active(self, db, test_vendor): + """Test creating active customer""" + service = OrderService() + customer = service.find_or_create_customer( + db=db, + vendor_id=test_vendor.id, + email="active@example.com", + first_name="Active", + last_name="Customer", + is_active=True, + ) + db.commit() + + assert customer.is_active is True + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceRetrieval: + """Test order retrieval""" + + def test_get_order_not_found(self, db, test_vendor): + """Test get_order raises for non-existent order""" + service = OrderService() + with pytest.raises(OrderNotFoundException): + service.get_order(db, test_vendor.id, 99999) + + def test_get_order_wrong_vendor(self, db, test_vendor, test_customer): + """Test get_order raises for wrong vendor""" + # Create order for test_vendor + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="TEST-ORDER-001", + channel="direct", + status="pending", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + # Try to get with different vendor + with pytest.raises(OrderNotFoundException): + service.get_order(db, 99999, order.id) + + def test_get_vendor_orders_empty(self, db, test_vendor): + """Test get_vendor_orders returns empty list when no orders""" + service = OrderService() + orders, total = service.get_vendor_orders(db, test_vendor.id) + + assert orders == [] + assert total == 0 + + def test_get_vendor_orders_with_filters(self, db, test_vendor, test_customer): + """Test get_vendor_orders filters correctly""" + # Create orders + order1 = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="FILTER-TEST-001", + channel="direct", + status="pending", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Filter", + customer_last_name="Test", + customer_email="filter@example.com", + order_date=datetime.now(UTC), + ) + order2 = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="FILTER-TEST-002", + channel="letzshop", + status="processing", + total_amount_cents=20000, + currency="EUR", + customer_first_name="Another", + customer_last_name="Test", + customer_email="another@example.com", + order_date=datetime.now(UTC), + ) + db.add(order1) + db.add(order2) + db.commit() + + service = OrderService() + + # Filter by status + orders, total = service.get_vendor_orders( + db, test_vendor.id, status="pending" + ) + assert all(o.status == "pending" for o in orders) + + # Filter by channel + orders, total = service.get_vendor_orders( + db, test_vendor.id, channel="letzshop" + ) + assert all(o.channel == "letzshop" for o in orders) + + # Search by email + orders, total = service.get_vendor_orders( + db, test_vendor.id, search="filter@" + ) + assert len(orders) >= 1 + + def test_get_order_by_external_shipment_id(self, db, test_vendor, test_customer): + """Test get_order_by_external_shipment_id returns correct order""" + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="EXT-SHIP-001", + channel="letzshop", + status="pending", + external_shipment_id="SHIPMENT123", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + found = service.get_order_by_external_shipment_id( + db, test_vendor.id, "SHIPMENT123" + ) + + assert found is not None + assert found.id == order.id + + def test_get_order_by_external_shipment_id_not_found(self, db, test_vendor): + """Test get_order_by_external_shipment_id returns None when not found""" + service = OrderService() + result = service.get_order_by_external_shipment_id( + db, test_vendor.id, "NONEXISTENT" + ) + assert result is None + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceStats: + """Test order statistics""" + + def test_get_order_stats_empty(self, db, test_vendor): + """Test get_order_stats returns zeros when no orders""" + service = OrderService() + stats = service.get_order_stats(db, test_vendor.id) + + assert stats["total"] == 0 + assert stats["pending"] == 0 + assert stats["processing"] == 0 + + def test_get_order_stats_with_orders(self, db, test_vendor, test_customer): + """Test get_order_stats counts correctly""" + # Create orders with different statuses + for status in ["pending", "pending", "processing"]: + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number=f"STAT-{status}-{datetime.now().timestamp()}", + channel="direct", + status=status, + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + stats = service.get_order_stats(db, test_vendor.id) + + assert stats["total"] >= 3 + assert stats["pending"] >= 2 + assert stats["processing"] >= 1 + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceUpdates: + """Test order updates""" + + def test_update_order_status(self, db, test_vendor, test_customer): + """Test update_order_status changes status""" + from models.schema.order import OrderUpdate + + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="UPDATE-TEST-001", + channel="direct", + status="pending", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + update_data = OrderUpdate(status="processing") + updated = service.update_order_status( + db, test_vendor.id, order.id, update_data + ) + db.commit() + + assert updated.status == "processing" + assert updated.confirmed_at is not None + + def test_set_order_tracking(self, db, test_vendor, test_customer): + """Test set_order_tracking updates tracking and status""" + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="TRACKING-TEST-001", + channel="direct", + status="processing", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + updated = service.set_order_tracking( + db, + test_vendor.id, + order.id, + tracking_number="TRACK123", + tracking_provider="DHL", + ) + db.commit() + + assert updated.tracking_number == "TRACK123" + assert updated.tracking_provider == "DHL" + assert updated.status == "shipped" + assert updated.shipped_at is not None + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceAdmin: + """Test admin operations""" + + def test_get_all_orders_admin_empty(self, db): + """Test get_all_orders_admin returns empty list when no orders""" + service = OrderService() + orders, total = service.get_all_orders_admin(db) + + assert isinstance(orders, list) + assert isinstance(total, int) + + def test_get_order_stats_admin(self, db): + """Test get_order_stats_admin returns stats""" + service = OrderService() + stats = service.get_order_stats_admin(db) + + assert "total_orders" in stats + assert "pending_orders" in stats + assert "processing_orders" in stats + assert "total_revenue" in stats + assert "vendors_with_orders" in stats + + def test_get_order_by_id_admin_not_found(self, db): + """Test get_order_by_id_admin raises for non-existent""" + service = OrderService() + with pytest.raises(OrderNotFoundException): + service.get_order_by_id_admin(db, 99999) + + def test_get_vendors_with_orders_admin(self, db): + """Test get_vendors_with_orders_admin returns list""" + service = OrderService() + result = service.get_vendors_with_orders_admin(db) + + assert isinstance(result, list) + + def test_mark_as_shipped_admin(self, db, test_vendor, test_customer): + """Test mark_as_shipped_admin updates order""" + order = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number="ADMIN-SHIP-001", + channel="direct", + status="processing", + total_amount_cents=10000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(order) + db.commit() + + service = OrderService() + updated = service.mark_as_shipped_admin( + db, + order.id, + tracking_number="ADMINTRACK123", + shipping_carrier="colissimo", + ) + db.commit() + + assert updated.status == "shipped" + assert updated.tracking_number == "ADMINTRACK123" + assert updated.shipping_carrier == "colissimo" + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceLetzshop: + """Test Letzshop order creation""" + + def test_create_letzshop_order_basic(self, db, test_vendor, test_product): + """Test creating Letzshop order with basic data""" + # Set up product with GTIN + test_product.gtin = "1234567890123" + test_product.vendor_id = test_vendor.id + db.commit() + + shipment_data = { + "id": "SHIP123", + "number": "H123456", + "state": "confirmed", + "order": { + "id": "ORD123", + "number": "LS-12345", + "email": "customer@example.com", + "completedAt": "2025-01-15T10:00:00Z", + "total": "49.99 EUR", + "shipAddress": { + "firstName": "John", + "lastName": "Doe", + "streetName": "Main Street", + "streetNumber": "123", + "city": "Luxembourg", + "postalCode": "1234", + "country": {"iso": "LU"}, + }, + "billAddress": { + "firstName": "John", + "lastName": "Doe", + "streetName": "Main Street", + "city": "Luxembourg", + "postalCode": "1234", + "country": {"iso": "LU"}, + }, + }, + "inventoryUnits": [ + { + "id": "UNIT1", + "state": "confirmed_available", + "variant": { + "id": "VAR1", + "sku": "SKU-001", + "price": "49.99 EUR", + "tradeId": {"number": "1234567890123", "parser": "ean13"}, + "product": {"name": {"en": "Test Product"}}, + }, + } + ], + } + + with patch( + "app.services.order_service.subscription_service" + ) as mock_sub: + mock_sub.can_create_order.return_value = (True, None) + mock_sub.increment_order_count.return_value = None + + service = OrderService() + order = service.create_letzshop_order( + db, test_vendor.id, shipment_data + ) + db.commit() + + assert order is not None + assert order.channel == "letzshop" + assert order.external_order_id == "ORD123" + assert order.customer_email == "customer@example.com" + + def test_create_letzshop_order_existing_returns_existing( + self, db, test_vendor, test_customer + ): + """Test creating Letzshop order returns existing if already exists""" + # Create existing order + existing = Order( + vendor_id=test_vendor.id, + customer_id=test_customer.id, + order_number=f"LS-{test_vendor.id}-EXISTING123", + channel="letzshop", + status="pending", + total_amount_cents=5000, + currency="EUR", + customer_first_name="Test", + customer_last_name="Customer", + customer_email="test@example.com", + order_date=datetime.now(UTC), + ) + db.add(existing) + db.commit() + + shipment_data = { + "id": "SHIP_EXISTING", + "order": { + "number": "EXISTING123", + "email": "new@example.com", + }, + "inventoryUnits": [], + } + + service = OrderService() + order = service.create_letzshop_order( + db, test_vendor.id, shipment_data + ) + + assert order.id == existing.id + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServicePlaceholder: + """Test placeholder product management""" + + def test_get_or_create_placeholder_creates_new(self, db, test_vendor): + """Test placeholder product creation""" + service = OrderService() + placeholder = service._get_or_create_placeholder_product( + db, test_vendor.id + ) + db.commit() + + assert placeholder is not None + assert placeholder.gtin == PLACEHOLDER_GTIN + assert placeholder.vendor_id == test_vendor.id + assert placeholder.is_active is False + + def test_get_or_create_placeholder_returns_existing(self, db, test_vendor): + """Test placeholder returns existing when already created""" + service = OrderService() + + placeholder1 = service._get_or_create_placeholder_product( + db, test_vendor.id + ) + db.commit() + + placeholder2 = service._get_or_create_placeholder_product( + db, test_vendor.id + ) + + assert placeholder1.id == placeholder2.id + + +@pytest.mark.unit +@pytest.mark.service +class TestOrderServiceSingleton: + """Test singleton instance""" + + def test_singleton_exists(self): + """Test order_service singleton exists""" + assert order_service is not None + assert isinstance(order_service, OrderService) diff --git a/tests/unit/services/test_vendor_service.py b/tests/unit/services/test_vendor_service.py index 93618157..eff85b5a 100644 --- a/tests/unit/services/test_vendor_service.py +++ b/tests/unit/services/test_vendor_service.py @@ -521,3 +521,208 @@ class TestVendorServiceExceptionDetails: exception = exc_info.value assert exception.status_code == 404 assert exception.error_code == "VENDOR_NOT_FOUND" + + +@pytest.mark.unit +@pytest.mark.vendors +class TestVendorServiceIdentifier: + """Tests for get_vendor_by_identifier method.""" + + def setup_method(self): + self.service = VendorService() + + def test_get_vendor_by_identifier_with_id(self, db, test_vendor): + """Test getting vendor by numeric ID string.""" + vendor = self.service.get_vendor_by_identifier(db, str(test_vendor.id)) + + assert vendor is not None + assert vendor.id == test_vendor.id + + def test_get_vendor_by_identifier_with_code(self, db, test_vendor): + """Test getting vendor by vendor_code.""" + vendor = self.service.get_vendor_by_identifier(db, test_vendor.vendor_code) + + assert vendor is not None + assert vendor.vendor_code == test_vendor.vendor_code + + def test_get_vendor_by_identifier_case_insensitive(self, db, test_vendor): + """Test getting vendor by vendor_code is case insensitive.""" + vendor = self.service.get_vendor_by_identifier( + db, test_vendor.vendor_code.lower() + ) + + assert vendor is not None + assert vendor.id == test_vendor.id + + def test_get_vendor_by_identifier_not_found(self, db): + """Test getting non-existent vendor.""" + with pytest.raises(VendorNotFoundException): + self.service.get_vendor_by_identifier(db, "NONEXISTENT_CODE") + + +@pytest.mark.unit +@pytest.mark.vendors +class TestVendorServicePermissions: + """Tests for permission checking methods.""" + + def setup_method(self): + self.service = VendorService() + + def test_can_update_vendor_admin(self, db, test_admin, test_vendor): + """Test admin can always update vendor.""" + vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first() + + assert self.service.can_update_vendor(vendor, test_admin) is True + + def test_can_update_vendor_owner(self, db, test_user, test_vendor): + """Test owner can update vendor.""" + vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first() + + assert self.service.can_update_vendor(vendor, test_user) is True + + def test_can_update_vendor_non_owner(self, db, other_company, test_vendor): + """Test non-owner cannot update vendor.""" + from models.database.user import User + + vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first() + other_user = db.query(User).filter(User.id == other_company.owner_user_id).first() + + # Clear any VendorUser relationships + assert self.service.can_update_vendor(vendor, other_user) is False + + def test_is_vendor_owner_true(self, db, test_user, test_vendor): + """Test _is_vendor_owner returns True for owner.""" + vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first() + + assert self.service._is_vendor_owner(vendor, test_user) is True + + def test_is_vendor_owner_false(self, db, other_company, test_vendor): + """Test _is_vendor_owner returns False for non-owner.""" + from models.database.user import User + + vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first() + other_user = db.query(User).filter(User.id == other_company.owner_user_id).first() + + assert self.service._is_vendor_owner(vendor, other_user) is False + + +@pytest.mark.unit +@pytest.mark.vendors +class TestVendorServiceUpdate: + """Tests for update methods.""" + + def setup_method(self): + self.service = VendorService() + + def test_update_vendor_success(self, db, test_user, test_vendor): + """Test successfully updating vendor profile.""" + from pydantic import BaseModel + + class VendorUpdate(BaseModel): + name: str | None = None + description: str | None = None + + class Config: + extra = "forbid" + + update_data = VendorUpdate( + name="Updated Vendor Name", + description="Updated description", + ) + + vendor = self.service.update_vendor( + db, test_vendor.id, update_data, test_user + ) + db.commit() + + assert vendor.name == "Updated Vendor Name" + assert vendor.description == "Updated description" + + def test_update_vendor_unauthorized(self, db, other_company, test_vendor): + """Test update fails for unauthorized user.""" + from pydantic import BaseModel + + from app.exceptions import InsufficientPermissionsException + from models.database.user import User + + class VendorUpdate(BaseModel): + name: str | None = None + + class Config: + extra = "forbid" + + other_user = db.query(User).filter(User.id == other_company.owner_user_id).first() + update_data = VendorUpdate(name="Unauthorized Update") + + with pytest.raises(InsufficientPermissionsException): + self.service.update_vendor( + db, test_vendor.id, update_data, other_user + ) + + def test_update_vendor_not_found(self, db, test_admin): + """Test update fails for non-existent vendor.""" + from pydantic import BaseModel + + class VendorUpdate(BaseModel): + name: str | None = None + + class Config: + extra = "forbid" + + update_data = VendorUpdate(name="Update") + + with pytest.raises(VendorNotFoundException): + self.service.update_vendor(db, 99999, update_data, test_admin) + + def test_update_marketplace_settings_success(self, db, test_user, test_vendor): + """Test successfully updating marketplace settings.""" + marketplace_config = { + "letzshop_csv_url_fr": "https://example.com/fr.csv", + "letzshop_csv_url_en": "https://example.com/en.csv", + } + + result = self.service.update_marketplace_settings( + db, test_vendor.id, marketplace_config, test_user + ) + db.commit() + + assert result["message"] == "Marketplace settings updated successfully" + assert result["letzshop_csv_url_fr"] == "https://example.com/fr.csv" + assert result["letzshop_csv_url_en"] == "https://example.com/en.csv" + + def test_update_marketplace_settings_unauthorized( + self, db, other_company, test_vendor + ): + """Test marketplace settings update fails for unauthorized user.""" + from app.exceptions import InsufficientPermissionsException + from models.database.user import User + + other_user = db.query(User).filter(User.id == other_company.owner_user_id).first() + marketplace_config = {"letzshop_csv_url_fr": "https://example.com/fr.csv"} + + with pytest.raises(InsufficientPermissionsException): + self.service.update_marketplace_settings( + db, test_vendor.id, marketplace_config, other_user + ) + + def test_update_marketplace_settings_not_found(self, db, test_admin): + """Test marketplace settings update fails for non-existent vendor.""" + marketplace_config = {"letzshop_csv_url_fr": "https://example.com/fr.csv"} + + with pytest.raises(VendorNotFoundException): + self.service.update_marketplace_settings( + db, 99999, marketplace_config, test_admin + ) + + +@pytest.mark.unit +@pytest.mark.vendors +class TestVendorServiceSingleton: + """Test singleton instance.""" + + def test_singleton_exists(self): + """Test vendor_service singleton exists.""" + from app.services.vendor_service import vendor_service + + assert vendor_service is not None + assert isinstance(vendor_service, VendorService)