feat: integer cents money handling, order page fixes, and vendor filter persistence

Money Handling Architecture:
- Store all monetary values as integer cents (€105.91 = 10591)
- Add app/utils/money.py with Money class and conversion helpers
- Add static/shared/js/money.js for frontend formatting
- Update all database models to use _cents columns (Product, Order, etc.)
- Update CSV processor to convert prices to cents on import
- Add Alembic migration for Float to Integer conversion
- Create .architecture-rules/money.yaml with 7 validation rules
- Add docs/architecture/money-handling.md documentation

Order Details Page Fixes:
- Fix customer name showing 'undefined undefined' - use flat field names
- Fix vendor info empty - add vendor_name/vendor_code to OrderDetailResponse
- Fix shipping address using wrong nested object structure
- Enrich order detail API response with vendor info

Vendor Filter Persistence Fixes:
- Fix orders.js: restoreSavedVendor now sets selectedVendor and filters
- Fix orders.js: init() only loads orders if no saved vendor to restore
- Fix marketplace-letzshop.js: restoreSavedVendor calls selectVendor()
- Fix marketplace-letzshop.js: clearVendorSelection clears TomSelect dropdown
- Align vendor selector placeholder text between pages

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-20 20:33:48 +01:00
parent 7f0d32c18d
commit a19c84ea4e
56 changed files with 6155 additions and 447 deletions

View File

@@ -53,19 +53,60 @@ def test_customer_address(db, test_vendor, test_customer):
@pytest.fixture
def test_order(db, test_vendor, test_customer, test_customer_address):
"""Create a test order."""
"""Create a test order with customer/address snapshots."""
from datetime import datetime, timezone
order = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
order_number="TEST-ORD-001",
status="pending",
channel="direct",
subtotal=99.99,
total_amount=99.99,
currency="EUR",
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
order_date=datetime.now(timezone.utc),
# Customer snapshot
customer_first_name=test_customer.first_name,
customer_last_name=test_customer.last_name,
customer_email=test_customer.email,
customer_phone=None,
# Shipping address snapshot
ship_first_name=test_customer_address.first_name,
ship_last_name=test_customer_address.last_name,
ship_address_line_1=test_customer_address.address_line_1,
ship_city=test_customer_address.city,
ship_postal_code=test_customer_address.postal_code,
ship_country_iso="LU",
# Billing address snapshot
bill_first_name=test_customer_address.first_name,
bill_last_name=test_customer_address.last_name,
bill_address_line_1=test_customer_address.address_line_1,
bill_city=test_customer_address.city,
bill_postal_code=test_customer_address.postal_code,
bill_country_iso="LU",
)
db.add(order)
db.commit()
db.refresh(order)
return order
@pytest.fixture
def test_order_item(db, test_order, test_product):
"""Create a test order item."""
from models.database.order import OrderItem
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name="Test Product",
product_sku="TEST-SKU-001",
quantity=1,
unit_price=49.99,
total_price=49.99,
)
db.add(order_item)
db.commit()
db.refresh(order_item)
return order_item

View File

@@ -246,16 +246,33 @@ class TestAdminLetzshopOrdersAPI:
def test_list_vendor_orders_with_data(self, client, db, admin_headers, test_vendor):
"""Test listing vendor orders with data."""
from models.database.letzshop import LetzshopOrder
from models.database.order import Order
# Create test orders
order = LetzshopOrder(
# Create test order using unified Order model with all required fields
order = Order(
vendor_id=test_vendor.id,
letzshop_order_id="admin_order_1",
letzshop_state="unconfirmed",
customer_id=1,
order_number=f"LS-{test_vendor.id}-admin_order_1",
channel="letzshop",
external_order_id="admin_order_1",
status="pending",
customer_first_name="Admin",
customer_last_name="Test",
customer_email="admin-test@example.com",
total_amount="150.00",
sync_status="pending",
ship_first_name="Admin",
ship_last_name="Test",
ship_address_line_1="123 Test Street",
ship_city="Luxembourg",
ship_postal_code="1234",
ship_country_iso="LU",
bill_first_name="Admin",
bill_last_name="Test",
bill_address_line_1="123 Test Street",
bill_city="Luxembourg",
bill_postal_code="1234",
bill_country_iso="LU",
total_amount_cents=15000, # €150.00
currency="EUR",
)
db.add(order)
db.commit()

View File

@@ -0,0 +1,249 @@
# tests/integration/api/v1/admin/test_order_item_exceptions.py
"""
Integration tests for admin order item exception endpoints.
Tests the /api/v1/admin/order-exceptions/* endpoints.
All endpoints require admin JWT authentication.
"""
import pytest
from datetime import datetime, timezone
from models.database.order import OrderItem
from models.database.order_item_exception import OrderItemException
@pytest.fixture
def test_exception(db, test_order, test_product, test_vendor):
"""Create a test order item exception."""
# Create an order item
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name="Unmatched Product",
product_sku="UNMATCHED-001",
quantity=1,
unit_price=25.00,
total_price=25.00,
needs_product_match=True,
)
db.add(order_item)
db.commit()
db.refresh(order_item)
# Create exception
exception = OrderItemException(
order_item_id=order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Missing Product",
original_sku="MISSING-SKU-001",
exception_type="product_not_found",
status="pending",
)
db.add(exception)
db.commit()
db.refresh(exception)
return exception
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminOrderItemExceptionAPI:
"""Tests for admin order item exception endpoints."""
# ========================================================================
# List & Statistics Tests
# ========================================================================
def test_list_exceptions(self, client, admin_headers, test_exception):
"""Test listing order item exceptions."""
response = client.get(
"/api/v1/admin/order-exceptions",
headers=admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "exceptions" in data
assert "total" in data
assert data["total"] >= 1
def test_list_exceptions_non_admin(self, client, auth_headers):
"""Test non-admin cannot access exceptions endpoint."""
response = client.get(
"/api/v1/admin/order-exceptions",
headers=auth_headers,
)
assert response.status_code == 403
def test_list_exceptions_with_status_filter(
self, client, admin_headers, test_exception
):
"""Test filtering exceptions by status."""
response = client.get(
"/api/v1/admin/order-exceptions",
params={"status": "pending"},
headers=admin_headers,
)
assert response.status_code == 200
data = response.json()
for exc in data["exceptions"]:
assert exc["status"] == "pending"
def test_get_exception_stats(self, client, admin_headers, test_exception):
"""Test getting exception statistics."""
response = client.get(
"/api/v1/admin/order-exceptions/stats",
headers=admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "pending" in data
assert "resolved" in data
assert "ignored" in data
assert "total" in data
assert data["pending"] >= 1
# ========================================================================
# Get Single Exception
# ========================================================================
def test_get_exception_by_id(self, client, admin_headers, test_exception):
"""Test getting a single exception by ID."""
response = client.get(
f"/api/v1/admin/order-exceptions/{test_exception.id}",
headers=admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["id"] == test_exception.id
assert data["original_gtin"] == test_exception.original_gtin
assert data["status"] == "pending"
def test_get_exception_not_found(self, client, admin_headers):
"""Test getting non-existent exception."""
response = client.get(
"/api/v1/admin/order-exceptions/99999",
headers=admin_headers,
)
assert response.status_code == 404
# ========================================================================
# Resolution Tests
# ========================================================================
def test_resolve_exception(
self, client, admin_headers, test_exception, test_product
):
"""Test resolving an exception by assigning a product."""
response = client.post(
f"/api/v1/admin/order-exceptions/{test_exception.id}/resolve",
headers=admin_headers,
json={
"product_id": test_product.id,
"notes": "Matched to existing product",
},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "resolved"
assert data["resolved_product_id"] == test_product.id
assert data["resolution_notes"] == "Matched to existing product"
def test_ignore_exception(self, client, admin_headers, test_exception):
"""Test ignoring an exception."""
response = client.post(
f"/api/v1/admin/order-exceptions/{test_exception.id}/ignore",
headers=admin_headers,
json={
"notes": "Product discontinued, will never be matched",
},
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "ignored"
assert "discontinued" in data["resolution_notes"]
def test_resolve_already_resolved(
self, client, admin_headers, db, test_exception, test_product
):
"""Test that resolving an already resolved exception fails."""
# First resolve it
test_exception.status = "resolved"
test_exception.resolved_product_id = test_product.id
test_exception.resolved_at = datetime.now(timezone.utc)
db.commit()
# Try to resolve again
response = client.post(
f"/api/v1/admin/order-exceptions/{test_exception.id}/resolve",
headers=admin_headers,
json={
"product_id": test_product.id,
},
)
assert response.status_code == 400
# ========================================================================
# Bulk Resolution Tests
# ========================================================================
def test_bulk_resolve_by_gtin(
self, client, admin_headers, db, test_order, test_product, test_vendor
):
"""Test bulk resolving exceptions by GTIN."""
gtin = "9876543210123"
# Create multiple exceptions for the same GTIN
for i in range(3):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
needs_product_match=True,
)
db.add(order_item)
db.commit()
exception = OrderItemException(
order_item_id=order_item.id,
vendor_id=test_vendor.id,
original_gtin=gtin,
original_product_name=f"Product {i}",
exception_type="product_not_found",
)
db.add(exception)
db.commit()
# Bulk resolve
response = client.post(
"/api/v1/admin/order-exceptions/bulk-resolve",
params={"vendor_id": test_vendor.id},
headers=admin_headers,
json={
"gtin": gtin,
"product_id": test_product.id,
"notes": "Bulk resolved during import",
},
)
assert response.status_code == 200
data = response.json()
assert data["resolved_count"] == 3
assert data["gtin"] == gtin
assert data["product_id"] == test_product.id

View File

@@ -238,50 +238,105 @@ class TestVendorLetzshopOrdersAPI:
self, client, db, vendor_user_headers, test_vendor_with_vendor_user
):
"""Test listing orders with status filter."""
from models.database.letzshop import LetzshopOrder
from models.database.order import Order
# Create test orders
order1 = LetzshopOrder(
# Create test orders using unified Order model with all required fields
order1 = Order(
vendor_id=test_vendor_with_vendor_user.id,
letzshop_order_id="order_1",
letzshop_state="unconfirmed",
sync_status="pending",
customer_id=1,
order_number=f"LS-{test_vendor_with_vendor_user.id}-order_1",
channel="letzshop",
external_order_id="order_1",
status="pending",
customer_first_name="Test",
customer_last_name="User",
customer_email="test1@example.com",
ship_first_name="Test",
ship_last_name="User",
ship_address_line_1="123 Test Street",
ship_city="Luxembourg",
ship_postal_code="1234",
ship_country_iso="LU",
bill_first_name="Test",
bill_last_name="User",
bill_address_line_1="123 Test Street",
bill_city="Luxembourg",
bill_postal_code="1234",
bill_country_iso="LU",
total_amount_cents=10000,
currency="EUR",
)
order2 = LetzshopOrder(
order2 = Order(
vendor_id=test_vendor_with_vendor_user.id,
letzshop_order_id="order_2",
letzshop_state="confirmed",
sync_status="confirmed",
customer_id=1,
order_number=f"LS-{test_vendor_with_vendor_user.id}-order_2",
channel="letzshop",
external_order_id="order_2",
status="processing",
customer_first_name="Test",
customer_last_name="User",
customer_email="test2@example.com",
ship_first_name="Test",
ship_last_name="User",
ship_address_line_1="456 Test Avenue",
ship_city="Luxembourg",
ship_postal_code="5678",
ship_country_iso="LU",
bill_first_name="Test",
bill_last_name="User",
bill_address_line_1="456 Test Avenue",
bill_city="Luxembourg",
bill_postal_code="5678",
bill_country_iso="LU",
total_amount_cents=20000,
currency="EUR",
)
db.add_all([order1, order2])
db.commit()
# List pending only
response = client.get(
"/api/v1/vendor/letzshop/orders?sync_status=pending",
"/api/v1/vendor/letzshop/orders?status=pending",
headers=vendor_user_headers,
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
assert data["orders"][0]["sync_status"] == "pending"
assert data["orders"][0]["status"] == "pending"
def test_get_order_detail(
self, client, db, vendor_user_headers, test_vendor_with_vendor_user
):
"""Test getting order detail."""
from models.database.letzshop import LetzshopOrder
from models.database.order import Order
order = LetzshopOrder(
order = Order(
vendor_id=test_vendor_with_vendor_user.id,
letzshop_order_id="order_detail_test",
letzshop_shipment_id="shipment_1",
letzshop_state="unconfirmed",
customer_id=1,
order_number=f"LS-{test_vendor_with_vendor_user.id}-order_detail_test",
channel="letzshop",
external_order_id="order_detail_test",
external_shipment_id="shipment_1",
status="pending",
customer_first_name="Test",
customer_last_name="User",
customer_email="test@example.com",
total_amount="99.99",
sync_status="pending",
raw_order_data={"test": "data"},
ship_first_name="Test",
ship_last_name="User",
ship_address_line_1="123 Test Street",
ship_city="Luxembourg",
ship_postal_code="1234",
ship_country_iso="LU",
bill_first_name="Test",
bill_last_name="User",
bill_address_line_1="123 Test Street",
bill_city="Luxembourg",
bill_postal_code="1234",
bill_country_iso="LU",
total_amount_cents=9999, # €99.99
currency="EUR",
external_data={"test": "data"},
)
db.add(order)
db.commit()
@@ -293,9 +348,9 @@ class TestVendorLetzshopOrdersAPI:
assert response.status_code == 200
data = response.json()
assert data["letzshop_order_id"] == "order_detail_test"
assert data["external_order_id"] == "order_detail_test"
assert data["customer_email"] == "test@example.com"
assert data["raw_order_data"] == {"test": "data"}
assert data["external_data"] == {"test": "data"}
def test_get_order_not_found(
self, client, vendor_user_headers, test_vendor_with_vendor_user
@@ -393,18 +448,50 @@ class TestVendorLetzshopFulfillmentAPI:
test_vendor_with_vendor_user,
):
"""Test confirming an order."""
from models.database.letzshop import LetzshopOrder
from models.database.order import Order, OrderItem
# Create test order
order = LetzshopOrder(
# Create test order using unified Order model with all required fields
order = Order(
vendor_id=test_vendor_with_vendor_user.id,
letzshop_order_id="order_confirm",
letzshop_shipment_id="shipment_1",
letzshop_state="unconfirmed",
sync_status="pending",
inventory_units=[{"id": "unit_1", "state": "unconfirmed"}],
customer_id=1,
order_number=f"LS-{test_vendor_with_vendor_user.id}-order_confirm",
channel="letzshop",
external_order_id="order_confirm",
external_shipment_id="shipment_1",
status="pending",
customer_first_name="Test",
customer_last_name="User",
customer_email="test@example.com",
ship_first_name="Test",
ship_last_name="User",
ship_address_line_1="123 Test Street",
ship_city="Luxembourg",
ship_postal_code="1234",
ship_country_iso="LU",
bill_first_name="Test",
bill_last_name="User",
bill_address_line_1="123 Test Street",
bill_city="Luxembourg",
bill_postal_code="1234",
bill_country_iso="LU",
total_amount_cents=10000,
currency="EUR",
)
db.add(order)
db.flush()
# Add order item
item = OrderItem(
order_id=order.id,
product_id=1,
product_name="Test Product",
quantity=1,
unit_price_cents=10000,
total_price_cents=10000,
external_item_id="unit_1",
item_state="unconfirmed",
)
db.add(item)
db.commit()
# Save credentials
@@ -447,14 +534,33 @@ class TestVendorLetzshopFulfillmentAPI:
test_vendor_with_vendor_user,
):
"""Test setting tracking information."""
from models.database.letzshop import LetzshopOrder
from models.database.order import Order
order = LetzshopOrder(
order = Order(
vendor_id=test_vendor_with_vendor_user.id,
letzshop_order_id="order_tracking",
letzshop_shipment_id="shipment_track",
letzshop_state="confirmed",
sync_status="confirmed",
customer_id=1,
order_number=f"LS-{test_vendor_with_vendor_user.id}-order_tracking",
channel="letzshop",
external_order_id="order_tracking",
external_shipment_id="shipment_track",
status="processing", # confirmed state
customer_first_name="Test",
customer_last_name="User",
customer_email="test@example.com",
ship_first_name="Test",
ship_last_name="User",
ship_address_line_1="123 Test Street",
ship_city="Luxembourg",
ship_postal_code="1234",
ship_country_iso="LU",
bill_first_name="Test",
bill_last_name="User",
bill_address_line_1="123 Test Street",
bill_city="Luxembourg",
bill_postal_code="1234",
bill_country_iso="LU",
total_amount_cents=10000,
currency="EUR",
)
db.add(order)
db.commit()

View File

@@ -1,12 +1,65 @@
# tests/unit/models/database/test_order.py
"""Unit tests for Order and OrderItem database models."""
from datetime import datetime, timezone
import pytest
from sqlalchemy.exc import IntegrityError
from models.database.order import Order, OrderItem
def create_order_with_snapshots(
db,
vendor,
customer,
customer_address,
order_number,
status="pending",
subtotal=99.99,
total_amount=99.99,
**kwargs
):
"""Helper to create Order with required address snapshots."""
# Remove channel from kwargs if present (we set it explicitly)
channel = kwargs.pop("channel", "direct")
order = Order(
vendor_id=vendor.id,
customer_id=customer.id,
order_number=order_number,
status=status,
channel=channel,
subtotal=subtotal,
total_amount=total_amount,
currency="EUR",
order_date=datetime.now(timezone.utc),
# Customer snapshot
customer_first_name=customer.first_name,
customer_last_name=customer.last_name,
customer_email=customer.email,
# Shipping address snapshot
ship_first_name=customer_address.first_name,
ship_last_name=customer_address.last_name,
ship_address_line_1=customer_address.address_line_1,
ship_city=customer_address.city,
ship_postal_code=customer_address.postal_code,
ship_country_iso="LU",
# Billing address snapshot
bill_first_name=customer_address.first_name,
bill_last_name=customer_address.last_name,
bill_address_line_1=customer_address.address_line_1,
bill_city=customer_address.city,
bill_postal_code=customer_address.postal_code,
bill_country_iso="LU",
**kwargs
)
db.add(order)
db.commit()
db.refresh(order)
return order
@pytest.mark.unit
@pytest.mark.database
class TestOrderModel:
@@ -16,60 +69,37 @@ class TestOrderModel:
self, db, test_vendor, test_customer, test_customer_address
):
"""Test Order model with customer relationship."""
order = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
order = create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="ORD-001",
status="pending",
subtotal=99.99,
total_amount=99.99,
currency="EUR",
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order)
db.commit()
db.refresh(order)
assert order.id is not None
assert order.vendor_id == test_vendor.id
assert order.customer_id == test_customer.id
assert order.order_number == "ORD-001"
assert order.status == "pending"
assert float(order.total_amount) == 99.99
# Verify snapshots
assert order.customer_first_name == test_customer.first_name
assert order.ship_city == test_customer_address.city
assert order.ship_country_iso == "LU"
def test_order_number_uniqueness(
self, db, test_vendor, test_customer, test_customer_address
):
"""Test order_number unique constraint."""
order1 = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="UNIQUE-ORD-001",
status="pending",
subtotal=50.00,
total_amount=50.00,
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order1)
db.commit()
# Duplicate order number should fail
with pytest.raises(IntegrityError):
order2 = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="UNIQUE-ORD-001",
status="pending",
subtotal=75.00,
total_amount=75.00,
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order2)
db.commit()
def test_order_status_values(
self, db, test_vendor, test_customer, test_customer_address
@@ -77,49 +107,32 @@ class TestOrderModel:
"""Test Order with different status values."""
statuses = [
"pending",
"confirmed",
"processing",
"shipped",
"delivered",
"cancelled",
"refunded",
]
for i, status in enumerate(statuses):
order = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
order = create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number=f"STATUS-ORD-{i:03d}",
status=status,
subtotal=50.00,
total_amount=50.00,
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order)
db.commit()
db.refresh(order)
assert order.status == status
def test_order_amounts(self, db, test_vendor, test_customer, test_customer_address):
"""Test Order amount fields."""
order = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
order = create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="AMOUNTS-ORD-001",
status="pending",
subtotal=100.00,
tax_amount=20.00,
shipping_amount=10.00,
discount_amount=5.00,
total_amount=125.00,
currency="EUR",
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order)
db.commit()
db.refresh(order)
assert float(order.subtotal) == 100.00
assert float(order.tax_amount) == 20.00
@@ -131,25 +144,32 @@ class TestOrderModel:
self, db, test_vendor, test_customer, test_customer_address
):
"""Test Order relationships."""
order = Order(
vendor_id=test_vendor.id,
customer_id=test_customer.id,
order = create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="REL-ORD-001",
status="pending",
subtotal=50.00,
total_amount=50.00,
shipping_address_id=test_customer_address.id,
billing_address_id=test_customer_address.id,
)
db.add(order)
db.commit()
db.refresh(order)
assert order.vendor is not None
assert order.customer is not None
assert order.vendor.id == test_vendor.id
assert order.customer.id == test_customer.id
def test_order_channel_field(
self, db, test_vendor, test_customer, test_customer_address
):
"""Test Order channel field for marketplace support."""
order = create_order_with_snapshots(
db, test_vendor, test_customer, test_customer_address,
order_number="CHANNEL-ORD-001",
channel="letzshop",
external_order_id="LS-12345",
external_shipment_id="SHIP-67890",
)
assert order.channel == "letzshop"
assert order.external_order_id == "LS-12345"
assert order.external_shipment_id == "SHIP-67890"
@pytest.mark.unit
@pytest.mark.database
@@ -249,3 +269,29 @@ class TestOrderItemModel:
assert item1.order_id == item2.order_id
assert item1.id != item2.id
assert item1.product_id == item2.product_id # Same product, different items
def test_order_item_needs_product_match(self, db, test_order, test_product):
"""Test OrderItem needs_product_match flag for exceptions."""
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name="Unmatched Product",
product_sku="UNMATCHED-001",
quantity=1,
unit_price=50.00,
total_price=50.00,
needs_product_match=True,
)
db.add(order_item)
db.commit()
db.refresh(order_item)
assert order_item.needs_product_match is True
# Resolve the match
order_item.needs_product_match = False
db.commit()
db.refresh(order_item)
assert order_item.needs_product_match is False

View File

@@ -0,0 +1,242 @@
# tests/unit/models/database/test_order_item_exception.py
"""Unit tests for OrderItemException database model."""
import pytest
from sqlalchemy.exc import IntegrityError
from models.database.order_item_exception import OrderItemException
@pytest.mark.unit
@pytest.mark.database
class TestOrderItemExceptionModel:
"""Test OrderItemException model."""
def test_exception_creation(self, db, test_order_item, test_vendor):
"""Test OrderItemException model creation."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Missing Product",
original_sku="MISSING-SKU-001",
exception_type="product_not_found",
status="pending",
)
db.add(exception)
db.commit()
db.refresh(exception)
assert exception.id is not None
assert exception.order_item_id == test_order_item.id
assert exception.vendor_id == test_vendor.id
assert exception.original_gtin == "4006381333931"
assert exception.original_product_name == "Test Missing Product"
assert exception.original_sku == "MISSING-SKU-001"
assert exception.exception_type == "product_not_found"
assert exception.status == "pending"
assert exception.created_at is not None
def test_exception_unique_order_item(self, db, test_order_item, test_vendor):
"""Test that only one exception can exist per order item."""
exception1 = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
)
db.add(exception1)
db.commit()
# Second exception for the same order item should fail
with pytest.raises(IntegrityError):
exception2 = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333932",
exception_type="product_not_found",
)
db.add(exception2)
db.commit()
def test_exception_types(self, db, test_order_item, test_vendor):
"""Test different exception types."""
exception_types = ["product_not_found", "gtin_mismatch", "duplicate_gtin"]
# Create new order items for each test to avoid unique constraint
from models.database.order import OrderItem
for i, exc_type in enumerate(exception_types):
order_item = OrderItem(
order_id=test_order_item.order_id,
product_id=test_order_item.product_id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
db.refresh(order_item)
exception = OrderItemException(
order_item_id=order_item.id,
vendor_id=test_vendor.id,
original_gtin=f"400638133393{i}",
exception_type=exc_type,
)
db.add(exception)
db.commit()
db.refresh(exception)
assert exception.exception_type == exc_type
def test_exception_status_values(self, db, test_order_item, test_vendor):
"""Test different status values."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
status="pending",
)
db.add(exception)
db.commit()
# Test pending status
assert exception.status == "pending"
assert exception.is_pending is True
assert exception.is_resolved is False
assert exception.is_ignored is False
assert exception.blocks_confirmation is True
# Test resolved status
exception.status = "resolved"
db.commit()
db.refresh(exception)
assert exception.is_pending is False
assert exception.is_resolved is True
assert exception.is_ignored is False
assert exception.blocks_confirmation is False
# Test ignored status
exception.status = "ignored"
db.commit()
db.refresh(exception)
assert exception.is_pending is False
assert exception.is_resolved is False
assert exception.is_ignored is True
assert exception.blocks_confirmation is True # Ignored still blocks
def test_exception_nullable_fields(self, db, test_order_item, test_vendor):
"""Test that GTIN and other fields can be null."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin=None, # Can be null for vouchers etc.
original_product_name="Gift Voucher",
original_sku=None,
exception_type="product_not_found",
)
db.add(exception)
db.commit()
db.refresh(exception)
assert exception.id is not None
assert exception.original_gtin is None
assert exception.original_sku is None
assert exception.original_product_name == "Gift Voucher"
def test_exception_resolution(self, db, test_order_item, test_vendor, test_product, test_user):
"""Test resolving an exception with a product."""
from datetime import datetime, timezone
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
status="pending",
)
db.add(exception)
db.commit()
# Resolve the exception
now = datetime.now(timezone.utc)
exception.status = "resolved"
exception.resolved_product_id = test_product.id
exception.resolved_at = now
exception.resolved_by = test_user.id
exception.resolution_notes = "Matched to existing product"
db.commit()
db.refresh(exception)
assert exception.status == "resolved"
assert exception.resolved_product_id == test_product.id
assert exception.resolved_at is not None
assert exception.resolved_by == test_user.id
assert exception.resolution_notes == "Matched to existing product"
def test_exception_relationships(self, db, test_order_item, test_vendor):
"""Test OrderItemException relationships."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
)
db.add(exception)
db.commit()
db.refresh(exception)
assert exception.order_item is not None
assert exception.order_item.id == test_order_item.id
assert exception.vendor is not None
assert exception.vendor.id == test_vendor.id
def test_exception_repr(self, db, test_order_item, test_vendor):
"""Test OrderItemException __repr__ method."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
status="pending",
)
db.add(exception)
db.commit()
db.refresh(exception)
repr_str = repr(exception)
assert "OrderItemException" in repr_str
assert str(exception.id) in repr_str
assert "4006381333931" in repr_str
assert "pending" in repr_str
def test_exception_cascade_delete(self, db, test_order_item, test_vendor):
"""Test that exception is deleted when order item is deleted."""
exception = OrderItemException(
order_item_id=test_order_item.id,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
exception_type="product_not_found",
)
db.add(exception)
db.commit()
exception_id = exception.id
# Delete the order item
db.delete(test_order_item)
db.commit()
# Exception should be cascade deleted
deleted_exception = (
db.query(OrderItemException)
.filter(OrderItemException.id == exception_id)
.first()
)
assert deleted_exception is None

View File

@@ -165,16 +165,16 @@ class TestProductModel:
def test_product_reset_to_source(self, db, test_vendor, test_marketplace_product):
"""Test reset_to_source methods."""
# Set up marketplace product values
test_marketplace_product.price_numeric = 100.00
# Set up marketplace product values (use cents internally)
test_marketplace_product.price_cents = 10000 # €100.00
test_marketplace_product.brand = "SourceBrand"
db.commit()
# Create product with overrides
# Create product with overrides (price property converts euros to cents)
product = Product(
vendor_id=test_vendor.id,
marketplace_product_id=test_marketplace_product.id,
price=89.99,
price_cents=8999, # €89.99
brand="OverrideBrand",
)
db.add(product)
@@ -184,13 +184,14 @@ class TestProductModel:
assert product.effective_price == 89.99
assert product.effective_brand == "OverrideBrand"
# Reset price to source
product.reset_field_to_source("price")
# Reset price_cents to source (OVERRIDABLE_FIELDS now uses _cents names)
product.reset_field_to_source("price_cents")
db.commit()
db.refresh(product)
assert product.price is None
assert product.effective_price == 100.00 # Now inherits
assert product.price_cents is None
assert product.price is None # Property returns None when cents is None
assert product.effective_price == 100.00 # Now inherits from marketplace
# Reset all fields
product.reset_all_to_source()

View File

@@ -1,11 +1,16 @@
# tests/unit/models/schema/test_order.py
"""Unit tests for order Pydantic schemas."""
from datetime import datetime, timezone
import pytest
from pydantic import ValidationError
from models.schema.order import (
OrderAddressCreate,
AddressSnapshot,
AddressSnapshotResponse,
CustomerSnapshot,
CustomerSnapshotResponse,
OrderCreate,
OrderItemCreate,
OrderItemResponse,
@@ -61,81 +66,155 @@ class TestOrderItemCreateSchema:
@pytest.mark.unit
@pytest.mark.schema
class TestOrderAddressCreateSchema:
"""Test OrderAddressCreate schema validation."""
class TestAddressSnapshotSchema:
"""Test AddressSnapshot schema validation."""
def test_valid_address(self):
"""Test valid order address creation."""
address = OrderAddressCreate(
"""Test valid address creation."""
address = AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
)
assert address.first_name == "John"
assert address.city == "Luxembourg"
assert address.country_iso == "LU"
def test_required_fields(self):
"""Test required fields validation."""
with pytest.raises(ValidationError):
OrderAddressCreate(
AddressSnapshot(
first_name="John",
# missing required fields
)
def test_optional_company(self):
"""Test optional company field."""
address = OrderAddressCreate(
address = AddressSnapshot(
first_name="John",
last_name="Doe",
company="Tech Corp",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
)
assert address.company == "Tech Corp"
def test_optional_address_line_2(self):
"""Test optional address_line_2 field."""
address = OrderAddressCreate(
address = AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
address_line_2="Suite 500",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
)
assert address.address_line_2 == "Suite 500"
def test_first_name_min_length(self):
"""Test first_name minimum length."""
with pytest.raises(ValidationError):
OrderAddressCreate(
AddressSnapshot(
first_name="",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
)
def test_country_min_length(self):
"""Test country minimum length (2)."""
def test_country_iso_min_length(self):
"""Test country_iso minimum length (2)."""
with pytest.raises(ValidationError):
OrderAddressCreate(
AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="L",
country_iso="L", # Too short
)
@pytest.mark.unit
@pytest.mark.schema
class TestAddressSnapshotResponseSchema:
"""Test AddressSnapshotResponse schema."""
def test_full_name_property(self):
"""Test full_name property."""
response = AddressSnapshotResponse(
first_name="John",
last_name="Doe",
company=None,
address_line_1="123 Main St",
address_line_2=None,
city="Luxembourg",
postal_code="L-1234",
country_iso="LU",
)
assert response.full_name == "John Doe"
@pytest.mark.unit
@pytest.mark.schema
class TestCustomerSnapshotSchema:
"""Test CustomerSnapshot schema validation."""
def test_valid_customer(self):
"""Test valid customer snapshot."""
customer = CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
phone="+352123456",
locale="en",
)
assert customer.first_name == "John"
assert customer.email == "john@example.com"
def test_optional_phone(self):
"""Test phone is optional."""
customer = CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
)
assert customer.phone is None
def test_optional_locale(self):
"""Test locale is optional."""
customer = CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
)
assert customer.locale is None
@pytest.mark.unit
@pytest.mark.schema
class TestCustomerSnapshotResponseSchema:
"""Test CustomerSnapshotResponse schema."""
def test_full_name_property(self):
"""Test full_name property."""
response = CustomerSnapshotResponse(
first_name="John",
last_name="Doe",
email="john@example.com",
phone=None,
locale=None,
)
assert response.full_name == "John Doe"
@pytest.mark.unit
@pytest.mark.schema
class TestOrderCreateSchema:
@@ -148,13 +227,18 @@ class TestOrderCreateSchema:
OrderItemCreate(product_id=1, quantity=2),
OrderItemCreate(product_id=2, quantity=1),
],
shipping_address=OrderAddressCreate(
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
),
)
assert len(order.items) == 2
@@ -164,13 +248,18 @@ class TestOrderCreateSchema:
"""Test items are required."""
with pytest.raises(ValidationError) as exc_info:
OrderCreate(
shipping_address=OrderAddressCreate(
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
),
)
assert "items" in str(exc_info.value).lower()
@@ -180,22 +269,48 @@ class TestOrderCreateSchema:
with pytest.raises(ValidationError) as exc_info:
OrderCreate(
items=[],
shipping_address=OrderAddressCreate(
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
),
)
assert "items" in str(exc_info.value).lower()
def test_customer_required(self):
"""Test customer is required."""
with pytest.raises(ValidationError) as exc_info:
OrderCreate(
items=[OrderItemCreate(product_id=1, quantity=1)],
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country_iso="LU",
),
)
assert "customer" in str(exc_info.value).lower()
def test_shipping_address_required(self):
"""Test shipping_address is required."""
with pytest.raises(ValidationError) as exc_info:
OrderCreate(
items=[OrderItemCreate(product_id=1, quantity=1)],
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
)
assert "shipping_address" in str(exc_info.value).lower()
@@ -203,21 +318,26 @@ class TestOrderCreateSchema:
"""Test billing_address is optional."""
order = OrderCreate(
items=[OrderItemCreate(product_id=1, quantity=1)],
shipping_address=OrderAddressCreate(
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
),
billing_address=OrderAddressCreate(
billing_address=AddressSnapshot(
first_name="Jane",
last_name="Doe",
address_line_1="456 Other St",
city="Esch",
postal_code="L-4321",
country="Luxembourg",
country_iso="LU",
),
)
assert order.billing_address is not None
@@ -227,13 +347,18 @@ class TestOrderCreateSchema:
"""Test optional customer_notes."""
order = OrderCreate(
items=[OrderItemCreate(product_id=1, quantity=1)],
shipping_address=OrderAddressCreate(
customer=CustomerSnapshot(
first_name="John",
last_name="Doe",
email="john@example.com",
),
shipping_address=AddressSnapshot(
first_name="John",
last_name="Doe",
address_line_1="123 Main St",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
country_iso="LU",
),
customer_notes="Please leave at door",
)
@@ -293,13 +418,13 @@ class TestOrderResponseSchema:
def test_from_dict(self):
"""Test creating response from dict."""
from datetime import datetime
now = datetime.now(timezone.utc)
data = {
"id": 1,
"vendor_id": 1,
"customer_id": 1,
"order_number": "ORD-001",
"channel": "direct",
"status": "pending",
"subtotal": 100.00,
"tax_amount": 20.00,
@@ -307,21 +432,97 @@ class TestOrderResponseSchema:
"discount_amount": 5.00,
"total_amount": 125.00,
"currency": "EUR",
# Customer snapshot
"customer_first_name": "John",
"customer_last_name": "Doe",
"customer_email": "john@example.com",
"customer_phone": None,
"customer_locale": "en",
# Ship address snapshot
"ship_first_name": "John",
"ship_last_name": "Doe",
"ship_company": None,
"ship_address_line_1": "123 Main St",
"ship_address_line_2": None,
"ship_city": "Luxembourg",
"ship_postal_code": "L-1234",
"ship_country_iso": "LU",
# Bill address snapshot
"bill_first_name": "John",
"bill_last_name": "Doe",
"bill_company": None,
"bill_address_line_1": "123 Main St",
"bill_address_line_2": None,
"bill_city": "Luxembourg",
"bill_postal_code": "L-1234",
"bill_country_iso": "LU",
# Tracking
"shipping_method": "standard",
"tracking_number": None,
"tracking_provider": None,
# Notes
"customer_notes": None,
"internal_notes": None,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"paid_at": None,
# Timestamps
"order_date": now,
"confirmed_at": None,
"shipped_at": None,
"delivered_at": None,
"cancelled_at": None,
"created_at": now,
"updated_at": now,
}
response = OrderResponse(**data)
assert response.id == 1
assert response.order_number == "ORD-001"
assert response.total_amount == 125.00
assert response.channel == "direct"
assert response.customer_full_name == "John Doe"
def test_is_marketplace_order(self):
"""Test is_marketplace_order property."""
now = datetime.now(timezone.utc)
# Direct order
direct_order = OrderResponse(
id=1, vendor_id=1, customer_id=1, order_number="ORD-001",
channel="direct", status="pending",
subtotal=100.0, tax_amount=0.0, shipping_amount=0.0, discount_amount=0.0,
total_amount=100.0, currency="EUR",
customer_first_name="John", customer_last_name="Doe",
customer_email="john@example.com", customer_phone=None, customer_locale=None,
ship_first_name="John", ship_last_name="Doe", ship_company=None,
ship_address_line_1="123 Main", ship_address_line_2=None,
ship_city="Luxembourg", ship_postal_code="L-1234", ship_country_iso="LU",
bill_first_name="John", bill_last_name="Doe", bill_company=None,
bill_address_line_1="123 Main", bill_address_line_2=None,
bill_city="Luxembourg", bill_postal_code="L-1234", bill_country_iso="LU",
shipping_method=None, tracking_number=None, tracking_provider=None,
customer_notes=None, internal_notes=None,
order_date=now, confirmed_at=None, shipped_at=None,
delivered_at=None, cancelled_at=None, created_at=now, updated_at=now,
)
assert direct_order.is_marketplace_order is False
# Marketplace order
marketplace_order = OrderResponse(
id=2, vendor_id=1, customer_id=1, order_number="LS-001",
channel="letzshop", status="pending",
subtotal=100.0, tax_amount=0.0, shipping_amount=0.0, discount_amount=0.0,
total_amount=100.0, currency="EUR",
customer_first_name="John", customer_last_name="Doe",
customer_email="john@example.com", customer_phone=None, customer_locale=None,
ship_first_name="John", ship_last_name="Doe", ship_company=None,
ship_address_line_1="123 Main", ship_address_line_2=None,
ship_city="Luxembourg", ship_postal_code="L-1234", ship_country_iso="LU",
bill_first_name="John", bill_last_name="Doe", bill_company=None,
bill_address_line_1="123 Main", bill_address_line_2=None,
bill_city="Luxembourg", bill_postal_code="L-1234", bill_country_iso="LU",
shipping_method=None, tracking_number=None, tracking_provider=None,
customer_notes=None, internal_notes=None,
order_date=now, confirmed_at=None, shipped_at=None,
delivered_at=None, cancelled_at=None, created_at=now, updated_at=now,
)
assert marketplace_order.is_marketplace_order is True
@pytest.mark.unit
@@ -331,26 +532,64 @@ class TestOrderItemResponseSchema:
def test_from_dict(self):
"""Test creating response from dict."""
from datetime import datetime
now = datetime.now(timezone.utc)
data = {
"id": 1,
"order_id": 1,
"product_id": 1,
"product_name": "Test Product",
"product_sku": "SKU-001",
"gtin": "4006381333931",
"gtin_type": "EAN13",
"quantity": 2,
"unit_price": 50.00,
"total_price": 100.00,
"inventory_reserved": True,
"inventory_fulfilled": False,
"created_at": datetime.now(),
"updated_at": datetime.now(),
"needs_product_match": False,
"created_at": now,
"updated_at": now,
}
response = OrderItemResponse(**data)
assert response.id == 1
assert response.quantity == 2
assert response.total_price == 100.00
assert response.gtin == "4006381333931"
def test_has_unresolved_exception(self):
"""Test has_unresolved_exception property."""
now = datetime.now(timezone.utc)
base_data = {
"id": 1, "order_id": 1, "product_id": 1,
"product_name": "Test", "product_sku": "SKU-001",
"gtin": None, "gtin_type": None,
"quantity": 1, "unit_price": 10.0, "total_price": 10.0,
"inventory_reserved": False, "inventory_fulfilled": False,
"created_at": now, "updated_at": now,
}
# No exception
response = OrderItemResponse(**base_data, needs_product_match=False, exception=None)
assert response.has_unresolved_exception is False
# Pending exception
from models.schema.order import OrderItemExceptionBrief
pending_exc = OrderItemExceptionBrief(
id=1, original_gtin="123", original_product_name="Test",
exception_type="product_not_found", status="pending",
resolved_product_id=None,
)
response = OrderItemResponse(**base_data, needs_product_match=True, exception=pending_exc)
assert response.has_unresolved_exception is True
# Resolved exception
resolved_exc = OrderItemExceptionBrief(
id=1, original_gtin="123", original_product_name="Test",
exception_type="product_not_found", status="resolved",
resolved_product_id=5,
)
response = OrderItemResponse(**base_data, needs_product_match=False, exception=resolved_exc)
assert response.has_unresolved_exception is False
@pytest.mark.unit

View File

@@ -581,7 +581,7 @@ class TestLetzshopOrderService:
"id": "order_123",
"number": "R123456",
"email": "test@example.com",
"total": 29.99,
"total": "29.99 EUR",
"locale": "fr",
"shipAddress": {
"firstName": "Jean",
@@ -598,8 +598,8 @@ class TestLetzshopOrderService:
order = service.create_order(test_vendor.id, shipment_data)
assert order.customer_locale == "fr"
assert order.shipping_country_iso == "LU"
assert order.billing_country_iso == "FR"
assert order.ship_country_iso == "LU" # Correct attribute name
assert order.bill_country_iso == "FR" # Correct attribute name
def test_create_order_extracts_ean(self, db, test_vendor):
"""Test that create_order extracts EAN from tradeId."""
@@ -640,14 +640,15 @@ class TestLetzshopOrderService:
order = service.create_order(test_vendor.id, shipment_data)
assert len(order.inventory_units) == 1
unit = order.inventory_units[0]
assert unit["ean"] == "0889698273022"
assert unit["ean_type"] == "gtin13"
assert unit["sku"] == "SKU123"
assert unit["mpn"] == "MPN456"
assert unit["product_name"] == "Test Product"
assert unit["price"] == 19.99
# Check order items (unified model uses items relationship)
assert len(order.items) == 1
item = order.items[0]
assert item.gtin == "0889698273022"
assert item.gtin_type == "gtin13"
assert item.product_sku == "SKU123"
assert item.product_name == "Test Product"
# Price is stored in cents (19.99 EUR = 1999 cents)
assert item.unit_price_cents == 1999
def test_import_historical_shipments_deduplication(self, db, test_vendor):
"""Test that historical import deduplicates existing orders."""

View File

@@ -0,0 +1,570 @@
# tests/unit/services/test_order_item_exception_service.py
"""Unit tests for OrderItemExceptionService."""
import pytest
from app.exceptions import (
ExceptionAlreadyResolvedException,
InvalidProductForExceptionException,
OrderItemExceptionNotFoundException,
ProductNotFoundException,
)
from app.services.order_item_exception_service import OrderItemExceptionService
from models.database.order import OrderItem
from models.database.order_item_exception import OrderItemException
@pytest.fixture
def exception_service():
"""Create an OrderItemExceptionService instance."""
return OrderItemExceptionService()
@pytest.mark.unit
class TestOrderItemExceptionServiceCreate:
"""Test exception creation."""
def test_create_exception(
self, db, exception_service, test_order_item, test_vendor
):
"""Test creating an exception."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
exception_type="product_not_found",
)
db.commit()
assert exception.id is not None
assert exception.order_item_id == test_order_item.id
assert exception.vendor_id == test_vendor.id
assert exception.original_gtin == "4006381333931"
assert exception.status == "pending"
def test_create_exception_no_gtin(
self, db, exception_service, test_order_item, test_vendor
):
"""Test creating an exception without GTIN (e.g., for vouchers)."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin=None,
original_product_name="Gift Voucher",
original_sku=None,
exception_type="product_not_found",
)
db.commit()
assert exception.id is not None
assert exception.original_gtin is None
@pytest.mark.unit
class TestOrderItemExceptionServiceGet:
"""Test exception retrieval."""
def test_get_exception_by_id(
self, db, exception_service, test_order_item, test_vendor
):
"""Test getting an exception by ID."""
created = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
fetched = exception_service.get_exception_by_id(db, created.id)
assert fetched.id == created.id
assert fetched.original_gtin == "4006381333931"
def test_get_exception_by_id_with_vendor_filter(
self, db, exception_service, test_order_item, test_vendor
):
"""Test getting an exception with vendor filter."""
created = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
# Should find with correct vendor
fetched = exception_service.get_exception_by_id(
db, created.id, vendor_id=test_vendor.id
)
assert fetched.id == created.id
# Should not find with wrong vendor
with pytest.raises(OrderItemExceptionNotFoundException):
exception_service.get_exception_by_id(db, created.id, vendor_id=99999)
def test_get_exception_not_found(self, db, exception_service):
"""Test getting a non-existent exception."""
with pytest.raises(OrderItemExceptionNotFoundException):
exception_service.get_exception_by_id(db, 99999)
def test_get_pending_exceptions(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test getting pending exceptions with pagination."""
# Create multiple order items and exceptions
for i in range(5):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin=f"400638133393{i}",
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
db.commit()
# Get all
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id
)
assert total == 5
assert len(exceptions) == 5
# Test pagination
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id, skip=0, limit=2
)
assert total == 5
assert len(exceptions) == 2
def test_get_pending_exceptions_with_status_filter(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test filtering exceptions by status."""
# Create order items
order_items = []
for i in range(3):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
order_items.append(order_item)
db.commit()
# Create exceptions with different statuses
statuses = ["pending", "resolved", "ignored"]
for i, status in enumerate(statuses):
exc = exception_service.create_exception(
db=db,
order_item=order_items[i],
vendor_id=test_vendor.id,
original_gtin=f"400638133393{i}",
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
exc.status = status
db.commit()
# Filter by pending
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id, status="pending"
)
assert total == 1
assert exceptions[0].status == "pending"
def test_get_pending_exceptions_with_search(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test searching exceptions."""
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name="Unique Product",
product_sku="UNIQUE-SKU",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin="9876543210123",
original_product_name="Searchable Product Name",
original_sku="SEARCH-SKU",
)
db.commit()
# Search by GTIN
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id, search="9876543210123"
)
assert total == 1
# Search by product name
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id, search="Searchable"
)
assert total == 1
@pytest.mark.unit
class TestOrderItemExceptionServiceStats:
"""Test exception statistics."""
def test_get_exception_stats(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test getting exception statistics."""
# Create order items and exceptions with different statuses
statuses_to_create = ["pending", "pending", "resolved", "ignored"]
for i, status in enumerate(statuses_to_create):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
exc = exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin=f"400638133393{i}",
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
exc.status = status
db.commit()
stats = exception_service.get_exception_stats(db, vendor_id=test_vendor.id)
assert stats["pending"] == 2
assert stats["resolved"] == 1
assert stats["ignored"] == 1
assert stats["total"] == 4
assert stats["orders_with_exceptions"] == 1 # All same order
@pytest.mark.unit
class TestOrderItemExceptionServiceResolve:
"""Test exception resolution."""
def test_resolve_exception(
self, db, exception_service, test_order_item, test_vendor, test_product, test_user
):
"""Test resolving an exception."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
resolved = exception_service.resolve_exception(
db=db,
exception_id=exception.id,
product_id=test_product.id,
resolved_by=test_user.id,
notes="Matched to existing product",
)
db.commit()
assert resolved.status == "resolved"
assert resolved.resolved_product_id == test_product.id
assert resolved.resolved_by == test_user.id
assert resolved.resolved_at is not None
assert resolved.resolution_notes == "Matched to existing product"
# Order item should be updated
db.refresh(test_order_item)
assert test_order_item.product_id == test_product.id
assert test_order_item.needs_product_match is False
def test_resolve_already_resolved_exception(
self, db, exception_service, test_order_item, test_vendor, test_product, test_user
):
"""Test that resolving an already resolved exception raises error."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
# Resolve first time
exception_service.resolve_exception(
db=db,
exception_id=exception.id,
product_id=test_product.id,
resolved_by=test_user.id,
)
db.commit()
# Try to resolve again
with pytest.raises(ExceptionAlreadyResolvedException):
exception_service.resolve_exception(
db=db,
exception_id=exception.id,
product_id=test_product.id,
resolved_by=test_user.id,
)
def test_resolve_with_invalid_product(
self, db, exception_service, test_order_item, test_vendor, test_user
):
"""Test resolving with non-existent product."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
with pytest.raises(ProductNotFoundException):
exception_service.resolve_exception(
db=db,
exception_id=exception.id,
product_id=99999,
resolved_by=test_user.id,
)
def test_ignore_exception(
self, db, exception_service, test_order_item, test_vendor, test_user
):
"""Test ignoring an exception."""
exception = exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
ignored = exception_service.ignore_exception(
db=db,
exception_id=exception.id,
resolved_by=test_user.id,
notes="Product discontinued",
)
db.commit()
assert ignored.status == "ignored"
assert ignored.resolved_by == test_user.id
assert ignored.resolution_notes == "Product discontinued"
@pytest.mark.unit
class TestOrderItemExceptionServiceAutoMatch:
"""Test auto-matching."""
def test_auto_match_by_gtin(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test auto-matching exceptions by GTIN."""
# Set the product GTIN
test_product.gtin = "4006381333931"
db.commit()
# Create order items with exceptions for this GTIN
for i in range(3):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
needs_product_match=True,
)
db.add(order_item)
db.commit()
exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931", # Same GTIN
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
db.commit()
# Auto-match should resolve all 3
resolved = exception_service.auto_match_by_gtin(
db=db,
vendor_id=test_vendor.id,
gtin="4006381333931",
product_id=test_product.id,
)
db.commit()
assert len(resolved) == 3
for exc in resolved:
assert exc.status == "resolved"
assert exc.resolved_product_id == test_product.id
assert "Auto-matched" in exc.resolution_notes
def test_auto_match_empty_gtin(self, db, exception_service, test_vendor):
"""Test that empty GTIN returns empty list."""
resolved = exception_service.auto_match_by_gtin(
db=db, vendor_id=test_vendor.id, gtin="", product_id=1
)
assert resolved == []
@pytest.mark.unit
class TestOrderItemExceptionServiceConfirmation:
"""Test confirmation checks."""
def test_order_has_unresolved_exceptions(
self, db, exception_service, test_order_item, test_vendor
):
"""Test checking for unresolved exceptions."""
order_id = test_order_item.order_id
# Initially no exceptions
assert exception_service.order_has_unresolved_exceptions(db, order_id) is False
# Add pending exception
exception_service.create_exception(
db=db,
order_item=test_order_item,
vendor_id=test_vendor.id,
original_gtin="4006381333931",
original_product_name="Test Product",
original_sku="SKU-001",
)
db.commit()
assert exception_service.order_has_unresolved_exceptions(db, order_id) is True
def test_get_unresolved_exception_count(
self, db, exception_service, test_order, test_product, test_vendor
):
"""Test getting unresolved exception count."""
# Create multiple exceptions
for i in range(3):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin=f"400638133393{i}",
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
db.commit()
count = exception_service.get_unresolved_exception_count(db, test_order.id)
assert count == 3
@pytest.mark.unit
class TestOrderItemExceptionServiceBulkResolve:
"""Test bulk operations."""
def test_bulk_resolve_by_gtin(
self, db, exception_service, test_order, test_product, test_vendor, test_user
):
"""Test bulk resolving exceptions by GTIN."""
gtin = "4006381333931"
# Create multiple exceptions for same GTIN
for i in range(3):
order_item = OrderItem(
order_id=test_order.id,
product_id=test_product.id,
product_name=f"Product {i}",
product_sku=f"SKU-{i}",
quantity=1,
unit_price=10.00,
total_price=10.00,
)
db.add(order_item)
db.commit()
exception_service.create_exception(
db=db,
order_item=order_item,
vendor_id=test_vendor.id,
original_gtin=gtin,
original_product_name=f"Product {i}",
original_sku=f"SKU-{i}",
)
db.commit()
count = exception_service.bulk_resolve_by_gtin(
db=db,
vendor_id=test_vendor.id,
gtin=gtin,
product_id=test_product.id,
resolved_by=test_user.id,
notes="Bulk resolved",
)
db.commit()
assert count == 3
# Verify all are resolved
exceptions, total = exception_service.get_pending_exceptions(
db, vendor_id=test_vendor.id, status="pending"
)
assert total == 0

View File

@@ -148,17 +148,17 @@ TEST001,Product 1,Description 1,19.99 EUR,Brand1,Category1"""
assert product_data["price"] == "19.99"
assert product_data["brand"] == "TestBrand"
def test_parse_price_to_numeric(self):
"""Test price string to numeric conversion"""
assert self.processor._parse_price_to_numeric("19.99 EUR") == 19.99
assert self.processor._parse_price_to_numeric("19,99 EUR") == 19.99
assert self.processor._parse_price_to_numeric("$29.99") == 29.99
assert self.processor._parse_price_to_numeric("100") == 100.0
assert self.processor._parse_price_to_numeric(None) is None
assert self.processor._parse_price_to_numeric("") is None
def test_parse_price_to_cents(self):
"""Test price string to cents conversion"""
assert self.processor._parse_price_to_cents("19.99 EUR") == 1999
assert self.processor._parse_price_to_cents("19,99 EUR") == 1999
assert self.processor._parse_price_to_cents("$29.99") == 2999
assert self.processor._parse_price_to_cents("100") == 10000
assert self.processor._parse_price_to_cents(None) is None
assert self.processor._parse_price_to_cents("") is None
def test_clean_row_data_with_prices(self):
"""Test row data cleaning with price parsing"""
"""Test row data cleaning with price parsing to cents"""
row_data = {
"marketplace_product_id": "TEST001",
"title": "Test Product",
@@ -169,8 +169,8 @@ TEST001,Product 1,Description 1,19.99 EUR,Brand1,Category1"""
cleaned = self.processor._clean_row_data(row_data)
assert cleaned["price_numeric"] == 19.99
assert cleaned["sale_price_numeric"] == 14.99
assert cleaned["price_cents"] == 1999 # Integer cents
assert cleaned["sale_price_cents"] == 1499 # Integer cents
assert cleaned["currency"] == "EUR"
@pytest.mark.asyncio