shop product refactoring

This commit is contained in:
2025-10-04 23:38:53 +02:00
parent 4d2866af5e
commit 0114b6c46e
68 changed files with 2234 additions and 2236 deletions

View File

@@ -74,67 +74,67 @@ class TestAdminAPI:
assert data["error_code"] == "USER_STATUS_CHANGE_FAILED"
assert "Cannot modify another admin user" in data["message"]
def test_get_all_shops_admin(self, client, admin_headers, test_shop):
"""Test admin getting all shops"""
response = client.get("/api/v1/admin/shops", headers=admin_headers)
def test_get_all_vendors_admin(self, client, admin_headers, test_vendor):
"""Test admin getting all vendors"""
response = client.get("/api/v1/admin/vendors", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["shops"]) >= 1
assert len(data["vendors"]) >= 1
# Check that test_shop is in the response
shop_codes = [
shop["shop_code"] for shop in data["shops"] if "shop_code" in shop
# Check that test_vendor is in the response
vendor_codes = [
vendor ["vendor_code"] for vendor in data["vendors"] if "vendor_code" in vendor
]
assert test_shop.shop_code in shop_codes
assert test_vendor.vendor_code in vendor_codes
def test_get_all_shops_non_admin(self, client, auth_headers):
"""Test non-admin trying to access admin shop endpoint"""
response = client.get("/api/v1/admin/shops", headers=auth_headers)
def test_get_all_vendors_non_admin(self, client, auth_headers):
"""Test non-admin trying to access admin vendor endpoint"""
response = client.get("/api/v1/admin/vendors", headers=auth_headers)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "ADMIN_REQUIRED"
def test_verify_shop_admin(self, client, admin_headers, test_shop):
"""Test admin verifying/unverifying shop"""
def test_verify_vendor_admin(self, client, admin_headers, test_vendor):
"""Test admin verifying/unverifying vendor """
response = client.put(
f"/api/v1/admin/shops/{test_shop.id}/verify", headers=admin_headers
f"/api/v1/admin/vendors/{test_vendor.id}/verify", headers=admin_headers
)
assert response.status_code == 200
message = response.json()["message"]
assert "verified" in message or "unverified" in message
assert test_shop.shop_code in message
assert test_vendor.vendor_code in message
def test_verify_shop_not_found(self, client, admin_headers):
"""Test admin verifying non-existent shop"""
response = client.put("/api/v1/admin/shops/99999/verify", headers=admin_headers)
def test_verify_vendor_not_found(self, client, admin_headers):
"""Test admin verifying non-existent vendor """
response = client.put("/api/v1/admin/vendors/99999/verify", headers=admin_headers)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert "Shop with ID '99999' not found" in data["message"]
assert data["error_code"] == "VENDOR_NOT_FOUND"
assert "Vendor with ID '99999' not found" in data["message"]
def test_toggle_shop_status_admin(self, client, admin_headers, test_shop):
"""Test admin toggling shop status"""
def test_toggle_vendor_status_admin(self, client, admin_headers, test_vendor):
"""Test admin toggling vendor status"""
response = client.put(
f"/api/v1/admin/shops/{test_shop.id}/status", headers=admin_headers
f"/api/v1/admin/vendors/{test_vendor.id}/status", headers=admin_headers
)
assert response.status_code == 200
message = response.json()["message"]
assert "activated" in message or "deactivated" in message
assert test_shop.shop_code in message
assert test_vendor.vendor_code in message
def test_toggle_shop_status_not_found(self, client, admin_headers):
"""Test admin toggling status for non-existent shop"""
response = client.put("/api/v1/admin/shops/99999/status", headers=admin_headers)
def test_toggle_vendor_status_not_found(self, client, admin_headers):
"""Test admin toggling status for non-existent vendor """
response = client.put("/api/v1/admin/vendors/99999/status", headers=admin_headers)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert data["error_code"] == "VENDOR_NOT_FOUND"
def test_get_marketplace_import_jobs_admin(
self, client, admin_headers, test_marketplace_import_job
@@ -191,17 +191,17 @@ class TestAdminAPI:
assert "activation_rate" in data
assert isinstance(data["total_users"], int)
def test_get_shop_statistics(self, client, admin_headers):
"""Test admin getting shop statistics"""
response = client.get("/api/v1/admin/stats/shops", headers=admin_headers)
def test_get_vendor_statistics(self, client, admin_headers):
"""Test admin getting vendor statistics"""
response = client.get("/api/v1/admin/stats/vendors", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert "total_shops" in data
assert "active_shops" in data
assert "verified_shops" in data
assert "total_vendors" in data
assert "active_vendors" in data
assert "verified_vendors" in data
assert "verification_rate" in data
assert isinstance(data["total_shops"], int)
assert isinstance(data["total_vendors"], int)
def test_admin_pagination_users(self, client, admin_headers, test_user, test_admin):
"""Test user pagination works correctly"""
@@ -221,14 +221,14 @@ class TestAdminAPI:
data = response.json()
assert len(data) >= 0 # Could be 1 or 0 depending on total users
def test_admin_pagination_shops(self, client, admin_headers, test_shop):
"""Test shop pagination works correctly"""
def test_admin_pagination_vendors(self, client, admin_headers, test_vendor):
"""Test vendor pagination works correctly"""
response = client.get(
"/api/v1/admin/shops?skip=0&limit=1", headers=admin_headers
"/api/v1/admin/vendors?skip=0&limit=1", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["shops"]) >= 0
assert len(data["vendors"]) >= 0
assert "skip" in data
assert "limit" in data

View File

@@ -8,15 +8,15 @@ import pytest
@pytest.mark.api
@pytest.mark.marketplace
class TestMarketplaceImportJobAPI:
def test_import_from_marketplace(self, client, auth_headers, test_shop, test_user):
def test_import_from_marketplace(self, client, auth_headers, test_vendor, test_user):
"""Test marketplace import endpoint - just test job creation"""
# Ensure user owns the shop
test_shop.owner_id = test_user.id
# Ensure user owns the vendor
test_vendor.owner_id = test_user.id
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code,
"vendor_code": test_vendor.vendor_code,
}
response = client.post(
@@ -28,15 +28,15 @@ class TestMarketplaceImportJobAPI:
assert data["status"] == "pending"
assert data["marketplace"] == "TestMarket"
assert "job_id" in data
assert data["shop_code"] == test_shop.shop_code
assert data["shop_id"] == test_shop.id
assert data["vendor_code"] == test_vendor.vendor_code
assert data["vendor_id"] == test_vendor.id
def test_import_from_marketplace_invalid_shop(self, client, auth_headers):
"""Test marketplace import with invalid shop"""
def test_import_from_marketplace_invalid_vendor(self, client, auth_headers):
"""Test marketplace import with invalid vendor """
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": "NONEXISTENT",
"vendor_code": "NONEXISTENT",
}
response = client.post(
@@ -45,18 +45,18 @@ class TestMarketplaceImportJobAPI:
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert data["error_code"] == "VENDOR_NOT_FOUND"
assert "NONEXISTENT" in data["message"]
def test_import_from_marketplace_unauthorized_shop(self, client, auth_headers, test_shop, other_user):
"""Test marketplace import with unauthorized shop access"""
# Set shop owner to different user
test_shop.owner_id = other_user.id
def test_import_from_marketplace_unauthorized_vendor(self, client, auth_headers, test_vendor, other_user):
"""Test marketplace import with unauthorized vendor access"""
# Set vendor owner to different user
test_vendor.owner_id = other_user.id
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code,
"vendor_code": test_vendor.vendor_code,
}
response = client.post(
@@ -65,15 +65,15 @@ class TestMarketplaceImportJobAPI:
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_SHOP_ACCESS"
assert test_shop.shop_code in data["message"]
assert data["error_code"] == "UNAUTHORIZED_VENDOR_ACCESS"
assert test_vendor.vendor_code in data["message"]
def test_import_from_marketplace_validation_error(self, client, auth_headers):
"""Test marketplace import with invalid request data"""
import_data = {
"url": "", # Empty URL
"marketplace": "", # Empty marketplace
# Missing shop_code
# Missing vendor_code
}
response = client.post(
@@ -85,12 +85,12 @@ class TestMarketplaceImportJobAPI:
assert data["error_code"] == "VALIDATION_ERROR"
assert "Request validation failed" in data["message"]
def test_import_from_marketplace_admin_access(self, client, admin_headers, test_shop):
"""Test that admin can import for any shop"""
def test_import_from_marketplace_admin_access(self, client, admin_headers, test_vendor):
"""Test that admin can import for any vendor """
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "AdminMarket",
"shop_code": test_shop.shop_code,
"vendor_code": test_vendor.vendor_code,
}
response = client.post(
@@ -100,7 +100,7 @@ class TestMarketplaceImportJobAPI:
assert response.status_code == 200
data = response.json()
assert data["marketplace"] == "AdminMarket"
assert data["shop_code"] == test_shop.shop_code
assert data["vendor_code"] == test_vendor.vendor_code
def test_get_marketplace_import_status(self, client, auth_headers, test_marketplace_import_job):
"""Test getting marketplace import status"""
@@ -195,7 +195,7 @@ class TestMarketplaceImportJobAPI:
assert isinstance(data["total_jobs"], int)
assert data["total_jobs"] >= 1
def test_cancel_marketplace_import_job(self, client, auth_headers, test_user, test_shop, db):
def test_cancel_marketplace_import_job(self, client, auth_headers, test_user, test_vendor, db):
"""Test cancelling a marketplace import job"""
# Create a pending job that can be cancelled
from models.database.marketplace_import_job import MarketplaceImportJob
@@ -205,9 +205,9 @@ class TestMarketplaceImportJobAPI:
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
vendor_name=f"Test_vendor_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
vendor_id=test_vendor.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
@@ -256,7 +256,7 @@ class TestMarketplaceImportJobAPI:
assert data["error_code"] == "IMPORT_JOB_CANNOT_BE_CANCELLED"
assert "completed" in data["message"]
def test_delete_marketplace_import_job(self, client, auth_headers, test_user, test_shop, db):
def test_delete_marketplace_import_job(self, client, auth_headers, test_user, test_vendor, db):
"""Test deleting a marketplace import job"""
# Create a completed job that can be deleted
from models.database.marketplace_import_job import MarketplaceImportJob
@@ -266,9 +266,9 @@ class TestMarketplaceImportJobAPI:
job = MarketplaceImportJob(
status="completed",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
vendor_name=f"Test_vendor_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
vendor_id=test_vendor.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
@@ -299,7 +299,7 @@ class TestMarketplaceImportJobAPI:
data = response.json()
assert data["error_code"] == "IMPORT_JOB_NOT_FOUND"
def test_delete_marketplace_import_job_cannot_delete(self, client, auth_headers, test_user, test_shop, db):
def test_delete_marketplace_import_job_cannot_delete(self, client, auth_headers, test_user, test_vendor, db):
"""Test deleting a job that cannot be deleted"""
# Create a pending job that cannot be deleted
from models.database.marketplace_import_job import MarketplaceImportJob
@@ -309,9 +309,9 @@ class TestMarketplaceImportJobAPI:
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
vendor_name=f"Test_vendor_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
vendor_id=test_vendor.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
@@ -344,7 +344,7 @@ class TestMarketplaceImportJobAPI:
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": "TEST_SHOP",
"vendor_code": "TEST_SHOP",
}
response = client.post("/api/v1/marketplace/import-product", json=import_data)
@@ -374,7 +374,7 @@ class TestMarketplaceImportJobAPI:
data = response.json()
assert data["job_id"] == test_marketplace_import_job.id
def test_admin_can_cancel_any_job(self, client, admin_headers, test_user, test_shop, db):
def test_admin_can_cancel_any_job(self, client, admin_headers, test_user, test_vendor, db):
"""Test that admin can cancel any job"""
# Create a pending job owned by different user
from models.database.marketplace_import_job import MarketplaceImportJob
@@ -384,9 +384,9 @@ class TestMarketplaceImportJobAPI:
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
vendor_name=f"Test_vendor_{unique_id}",
user_id=test_user.id, # Different user
shop_id=test_shop.id,
vendor_id=test_vendor.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
@@ -406,16 +406,16 @@ class TestMarketplaceImportJobAPI:
data = response.json()
assert data["status"] == "cancelled"
def test_rate_limiting_applied(self, client, auth_headers, test_shop, test_user):
def test_rate_limiting_applied(self, client, auth_headers, test_vendor, test_user):
"""Test that rate limiting is applied to import endpoint"""
# This test verifies that the rate_limit decorator is present
# Actual rate limiting testing would require multiple requests
test_shop.owner_id = test_user.id
test_vendor.owner_id = test_user.id
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code,
"vendor_code": test_vendor.vendor_code,
}
response = client.post(

View File

@@ -65,19 +65,19 @@ class TestExportFunctionality:
assert f"EXP1_{unique_suffix}" in csv_content
assert f"EXP2_{unique_suffix}" not in csv_content # Should be filtered out
def test_csv_export_with_shop_filter_success(self, client, auth_headers, db):
"""Test CSV export with shop name filtering successfully"""
def test_csv_export_with_vendor_filter_success(self, client, auth_headers, db):
"""Test CSV export with vendor name filtering successfully"""
unique_suffix = str(uuid.uuid4())[:8]
products = [
MarketplaceProduct(
marketplace_product_id=f"SHOP1_{unique_suffix}",
title=f"Shop1 MarketplaceProduct {unique_suffix}",
shop_name="TestShop1"
vendor_name="TestVendor1"
),
MarketplaceProduct(
marketplace_product_id=f"SHOP2_{unique_suffix}",
title=f"Shop2 MarketplaceProduct {unique_suffix}",
shop_name="TestShop2"
vendor_name="TestVendor2"
),
]
@@ -85,7 +85,7 @@ class TestExportFunctionality:
db.commit()
response = client.get(
"/api/v1/marketplace/product?shop_name=TestShop1", headers=auth_headers
"/api/v1/marketplace/product?vendor_name=TestVendor1", headers=auth_headers
)
assert response.status_code == 200
@@ -94,26 +94,26 @@ class TestExportFunctionality:
assert f"SHOP2_{unique_suffix}" not in csv_content # Should be filtered out
def test_csv_export_with_combined_filters_success(self, client, auth_headers, db):
"""Test CSV export with combined marketplace and shop filters successfully"""
"""Test CSV export with combined marketplace and vendor filters successfully"""
unique_suffix = str(uuid.uuid4())[:8]
products = [
MarketplaceProduct(
marketplace_product_id=f"COMBO1_{unique_suffix}",
title=f"Combo MarketplaceProduct 1 {unique_suffix}",
marketplace="Amazon",
shop_name="TestShop"
vendor_name="TestVendor"
),
MarketplaceProduct(
marketplace_product_id=f"COMBO2_{unique_suffix}",
title=f"Combo MarketplaceProduct 2 {unique_suffix}",
marketplace="eBay",
shop_name="TestShop"
vendor_name="TestVendor"
),
MarketplaceProduct(
marketplace_product_id=f"COMBO3_{unique_suffix}",
title=f"Combo MarketplaceProduct 3 {unique_suffix}",
marketplace="Amazon",
shop_name="OtherShop"
vendor_name="OtherShop"
),
]
@@ -121,7 +121,7 @@ class TestExportFunctionality:
db.commit()
response = client.get(
"/api/v1/marketplace/product?marketplace=Amazon&shop_name=TestShop",
"/api/v1/marketplace/product?marketplace=Amazon&vendor_name=TestVendor",
headers=auth_headers
)
assert response.status_code == 200
@@ -129,7 +129,7 @@ class TestExportFunctionality:
csv_content = response.content.decode("utf-8")
assert f"COMBO1_{unique_suffix}" in csv_content # Matches both filters
assert f"COMBO2_{unique_suffix}" not in csv_content # Wrong marketplace
assert f"COMBO3_{unique_suffix}" not in csv_content # Wrong shop
assert f"COMBO3_{unique_suffix}" not in csv_content # Wrong vendor
def test_csv_export_no_results(self, client, auth_headers):
"""Test CSV export with filters that return no results"""

View File

@@ -2,7 +2,7 @@
import pytest
from models.database.marketplace_product import MarketplaceProduct
from models.database.shop import Shop
from models.database.vendor import Vendor
@pytest.mark.integration
@pytest.mark.api
@@ -181,34 +181,34 @@ class TestPagination:
overlap = set(first_page_ids) & set(second_page_ids)
assert len(overlap) == 0, "Pages should not have overlapping products"
def test_shop_pagination_success(self, client, admin_headers, db, test_user):
"""Test pagination for shop listing successfully"""
def test_vendor_pagination_success(self, client, admin_headers, db, test_user):
"""Test pagination for vendor listing successfully"""
import uuid
unique_suffix = str(uuid.uuid4())[:8]
# Create multiple shops for pagination testing
from models.database.shop import Shop
shops = []
# Create multiple vendors for pagination testing
from models.database.vendor import Vendor
vendors =[]
for i in range(15):
shop = Shop(
shop_code=f"PAGESHOP{i:03d}_{unique_suffix}",
shop_name=f"Pagination Shop {i}",
vendor = Vendor(
vendor_code=f"PAGESHOP{i:03d}_{unique_suffix}",
vendor_name=f"Pagination Vendor {i}",
owner_id=test_user.id,
is_active=True,
)
shops.append(shop)
vendors.append(vendor)
db.add_all(shops)
db.add_all(vendors)
db.commit()
# Test first page (assuming admin endpoint exists)
response = client.get(
"/api/v1/shop?limit=5&skip=0", headers=admin_headers
"/api/v1/vendor ?limit=5&skip=0", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert len(data["shops"]) == 5
assert data["total"] >= 15 # At least our test shops
assert len(data["vendors"]) == 5
assert data["total"] >= 15 # At least our test vendors
assert data["skip"] == 0
assert data["limit"] == 5

View File

@@ -1,389 +0,0 @@
# tests/integration/api/v1/test_shop_endpoints.py
import pytest
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.shops
class TestShopsAPI:
def test_create_shop_success(self, client, auth_headers):
"""Test creating a new shop successfully"""
shop_data = {
"shop_code": "NEWSHOP001",
"shop_name": "New Shop",
"description": "A new test shop",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
assert response.status_code == 200
data = response.json()
assert data["shop_code"] == "NEWSHOP001"
assert data["shop_name"] == "New Shop"
assert data["is_active"] is True
def test_create_shop_duplicate_code_returns_conflict(self, client, auth_headers, test_shop):
"""Test creating shop with duplicate code returns ShopAlreadyExistsException"""
shop_data = {
"shop_code": test_shop.shop_code,
"shop_name": "Different Name",
"description": "Different description",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "SHOP_ALREADY_EXISTS"
assert data["status_code"] == 409
assert test_shop.shop_code in data["message"]
assert data["details"]["shop_code"] == test_shop.shop_code
def test_create_shop_missing_shop_code_validation_error(self, client, auth_headers):
"""Test creating shop without shop_code returns ValidationException"""
shop_data = {
"shop_name": "Shop without Code",
"description": "Missing shop code",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert data["status_code"] == 422
assert "Request validation failed" in data["message"]
assert "validation_errors" in data["details"]
def test_create_shop_empty_shop_name_validation_error(self, client, auth_headers):
"""Test creating shop with empty shop_name returns ShopValidationException"""
shop_data = {
"shop_code": "EMPTYNAME",
"shop_name": "", # Empty shop name
"description": "Shop with empty name",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "INVALID_SHOP_DATA"
assert data["status_code"] == 422
assert "Shop name is required" in data["message"]
assert data["details"]["field"] == "shop_name"
def test_create_shop_max_shops_reached_business_logic_error(self, client, auth_headers, db, test_user):
"""Test creating shop when max shops reached returns MaxShopsReachedException"""
# This test would require creating the maximum allowed shops first
# The exact implementation depends on your business rules
# For now, we'll test the structure of what the error should look like
# In a real scenario, you'd create max_shops number of shops first
# Assuming max shops is enforced at service level
# This test validates the expected response structure
pass # Implementation depends on your max_shops business logic
def test_get_shops_success(self, client, auth_headers, test_shop):
"""Test getting shops list successfully"""
response = client.get("/api/v1/shop", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["shops"]) >= 1
# Find our test shop
test_shop_found = any(s["shop_code"] == test_shop.shop_code for s in data["shops"])
assert test_shop_found
def test_get_shops_with_filters(self, client, auth_headers, test_shop):
"""Test getting shops with filtering options"""
# Test active_only filter
response = client.get("/api/v1/shop?active_only=true", headers=auth_headers)
assert response.status_code == 200
data = response.json()
for shop in data["shops"]:
assert shop["is_active"] is True
# Test verified_only filter
response = client.get("/api/v1/shop?verified_only=true", headers=auth_headers)
assert response.status_code == 200
# Response should only contain verified shops
def test_get_shop_by_code_success(self, client, auth_headers, test_shop):
"""Test getting specific shop successfully"""
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["shop_code"] == test_shop.shop_code
assert data["shop_name"] == test_shop.shop_name
def test_get_shop_by_code_not_found(self, client, auth_headers):
"""Test getting nonexistent shop returns ShopNotFoundException"""
response = client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT" in data["message"]
assert data["details"]["resource_type"] == "Shop"
assert data["details"]["identifier"] == "NONEXISTENT"
def test_get_shop_unauthorized_access(self, client, auth_headers, test_shop, other_user, db):
"""Test accessing shop owned by another user returns UnauthorizedShopAccessException"""
# Change shop owner to other user AND make it unverified/inactive
# so that non-owner users cannot access it
test_shop.owner_id = other_user.id
test_shop.is_verified = False # Make it not publicly accessible
db.commit()
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_SHOP_ACCESS"
assert data["status_code"] == 403
assert test_shop.shop_code in data["message"]
assert data["details"]["shop_code"] == test_shop.shop_code
def test_get_shop_unauthorized_access_with_inactive_shop(self, client, auth_headers, inactive_shop):
"""Test accessing inactive shop owned by another user returns UnauthorizedShopAccessException"""
# inactive_shop fixture already creates an unverified, inactive shop owned by other_user
response = client.get(
f"/api/v1/shop/{inactive_shop.shop_code}", headers=auth_headers
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_SHOP_ACCESS"
assert data["status_code"] == 403
assert inactive_shop.shop_code in data["message"]
assert data["details"]["shop_code"] == inactive_shop.shop_code
def test_get_shop_public_access_allowed(self, client, auth_headers, verified_shop):
"""Test accessing verified shop owned by another user is allowed (public access)"""
# verified_shop fixture creates a verified, active shop owned by other_user
# This should allow public access per your business logic
response = client.get(
f"/api/v1/shop/{verified_shop.shop_code}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["shop_code"] == verified_shop.shop_code
assert data["shop_name"] == verified_shop.shop_name
def test_add_product_to_shop_success(self, client, auth_headers, test_shop, unique_product):
"""Test adding product to shop successfully"""
product_data = {
"marketplace_product_id": unique_product.marketplace_product_id, # Use string marketplace_product_id, not database id
"price": 29.99,
"is_active": True,
"is_featured": False,
}
response = client.post(
f"/api/v1/shop/{test_shop.shop_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 200
data = response.json()
# The response structure contains nested product data
assert data["shop_id"] == test_shop.id
assert data["price"] == 29.99
assert data["is_active"] is True
assert data["is_featured"] is False
# MarketplaceProduct details are nested in the 'marketplace_product' field
assert "marketplace_product" in data
assert data["marketplace_product"]["marketplace_product_id"] == unique_product.marketplace_product_id
assert data["marketplace_product"]["id"] == unique_product.id
def test_add_product_to_shop_already_exists_conflict(self, client, auth_headers, test_shop, test_product):
"""Test adding product that already exists in shop returns ProductAlreadyExistsException"""
# test_product fixture already creates a relationship, get the marketplace_product_id string
existing_product = test_product.marketplace_product
product_data = {
"marketplace_product_id": existing_product.marketplace_product_id, # Use string marketplace_product_id
"shop_price": 29.99,
}
response = client.post(
f"/api/v1/shop/{test_shop.shop_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "PRODUCT_ALREADY_EXISTS"
assert data["status_code"] == 409
assert test_shop.shop_code in data["message"]
assert existing_product.marketplace_product_id in data["message"]
def test_add_nonexistent_product_to_shop_not_found(self, client, auth_headers, test_shop):
"""Test adding nonexistent product to shop returns MarketplaceProductNotFoundException"""
product_data = {
"marketplace_product_id": "NONEXISTENT_PRODUCT", # Use string marketplace_product_id that doesn't exist
"shop_price": 29.99,
}
response = client.post(
f"/api/v1/shop/{test_shop.shop_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "PRODUCT_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT_PRODUCT" in data["message"]
def test_get_products_success(self, client, auth_headers, test_shop, test_product):
"""Test getting shop products successfully"""
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}/products",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["products"]) >= 1
assert "shop" in data
assert data["shop"]["shop_code"] == test_shop.shop_code
def test_get_products_with_filters(self, client, auth_headers, test_shop):
"""Test getting shop products with filtering"""
# Test active_only filter
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}/products?active_only=true",
headers=auth_headers
)
assert response.status_code == 200
# Test featured_only filter
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}/products?featured_only=true",
headers=auth_headers
)
assert response.status_code == 200
def test_get_products_from_nonexistent_shop_not_found(self, client, auth_headers):
"""Test getting products from nonexistent shop returns ShopNotFoundException"""
response = client.get(
"/api/v1/shop/NONEXISTENT/products",
headers=auth_headers
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT" in data["message"]
def test_shop_not_active_business_logic_error(self, client, auth_headers, test_shop, db):
"""Test accessing inactive shop returns ShopNotActiveException (if enforced)"""
# Set shop to inactive
test_shop.is_active = False
db.commit()
# Depending on your business logic, this might return an error
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers
)
# If your service enforces active shop requirement
if response.status_code == 400:
data = response.json()
assert data["error_code"] == "SHOP_NOT_ACTIVE"
assert data["status_code"] == 400
assert test_shop.shop_code in data["message"]
def test_shop_not_verified_business_logic_error(self, client, auth_headers, test_shop, db):
"""Test operations requiring verification returns ShopNotVerifiedException (if enforced)"""
# Set shop to unverified
test_shop.is_verified = False
db.commit()
# Test adding products (might require verification)
product_data = {
"marketplace_product_id": 1,
"shop_price": 29.99,
}
response = client.post(
f"/api/v1/shop/{test_shop.shop_code}/products",
headers=auth_headers,
json=product_data
)
# If your service requires verification for adding products
if response.status_code == 400:
data = response.json()
assert data["error_code"] == "SHOP_NOT_VERIFIED"
assert data["status_code"] == 400
assert test_shop.shop_code in data["message"]
def test_get_shop_without_auth_returns_invalid_token(self, client):
"""Test that shop endpoints require authentication returns InvalidTokenException"""
response = client.get("/api/v1/shop")
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
assert data["status_code"] == 401
def test_pagination_validation_errors(self, client, auth_headers):
"""Test pagination parameter validation"""
# Test negative skip
response = client.get("/api/v1/shop?skip=-1", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
# Test zero limit
response = client.get("/api/v1/shop?limit=0", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
# Test excessive limit
response = client.get("/api/v1/shop?limit=10000", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
def test_exception_structure_consistency(self, client, auth_headers):
"""Test that all shop exceptions follow the consistent LetzShopException structure"""
# Test with a known error case
response = client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
data = response.json()
# Verify exception structure matches LetzShopException.to_dict()
required_fields = ["error_code", "message", "status_code"]
for field in required_fields:
assert field in data, f"Missing required field: {field}"
assert isinstance(data["error_code"], str)
assert isinstance(data["message"], str)
assert isinstance(data["status_code"], int)
# Details field should be present for domain-specific exceptions
if "details" in data:
assert isinstance(data["details"], dict)

View File

@@ -15,7 +15,7 @@ class TestStatsAPI:
assert "unique_brands" in data
assert "unique_categories" in data
assert "unique_marketplaces" in data
assert "unique_shops" in data
assert "unique_vendors" in data
assert data["total_products"] >= 1
def test_get_marketplace_stats(self, client, auth_headers, test_marketplace_product):

View File

@@ -0,0 +1,389 @@
# tests/integration/api/v1/test_vendor_endpoints.py
import pytest
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.vendors
class TestVendorsAPI:
def test_create_vendor_success(self, client, auth_headers):
"""Test creating a new vendor successfully"""
vendor_data = {
"vendor_code": "NEWVENDOR001",
"vendor_name": "New Vendor",
"description": "A new test vendor ",
}
response = client.post("/api/v1/vendor ", headers=auth_headers, json=vendor_data)
assert response.status_code == 200
data = response.json()
assert data["vendor_code"] == "NEWVENDOR001"
assert data["vendor_name"] == "New Vendor"
assert data["is_active"] is True
def test_create_vendor_duplicate_code_returns_conflict(self, client, auth_headers, test_vendor):
"""Test creating vendor with duplicate code returns VendorAlreadyExistsException"""
vendor_data = {
"vendor_code": test_vendor.vendor_code,
"vendor_name": "Different Name",
"description": "Different description",
}
response = client.post("/api/v1/vendor ", headers=auth_headers, json=vendor_data)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "VENDOR_ALREADY_EXISTS"
assert data["status_code"] == 409
assert test_vendor.vendor_code in data["message"]
assert data["details"]["vendor_code"] == test_vendor.vendor_code
def test_create_vendor_missing_vendor_code_validation_error(self, client, auth_headers):
"""Test creating vendor without vendor_code returns ValidationException"""
vendor_data = {
"vendor_name": "Vendor without Code",
"description": "Missing vendor code",
}
response = client.post("/api/v1/vendor ", headers=auth_headers, json=vendor_data)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert data["status_code"] == 422
assert "Request validation failed" in data["message"]
assert "validation_errors" in data["details"]
def test_create_vendor_empty_vendor_name_validation_error(self, client, auth_headers):
"""Test creating vendor with empty vendor_name returns VendorValidationException"""
vendor_data = {
"vendor_code": "EMPTYNAME",
"vendor_name": "", # Empty vendor name
"description": "Vendor with empty name",
}
response = client.post("/api/v1/vendor ", headers=auth_headers, json=vendor_data)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "INVALID_VENDOR_DATA"
assert data["status_code"] == 422
assert "Vendor name is required" in data["message"]
assert data["details"]["field"] == "vendor_name"
def test_create_vendor_max_vendors_reached_business_logic_error(self, client, auth_headers, db, test_user):
"""Test creating vendor when max vendors reached returns MaxVendorsReachedException"""
# This test would require creating the maximum allowed vendors first
# The exact implementation depends on your business rules
# For now, we'll test the structure of what the error should look like
# In a real scenario, you'd create max_vendors number of vendors first
# Assuming max vendors is enforced at service level
# This test validates the expected response structure
pass # Implementation depends on your max_vendors business logic
def test_get_vendors_success(self, client, auth_headers, test_vendor):
"""Test getting vendors list successfully"""
response = client.get("/api/v1/vendor ", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["vendors"]) >= 1
# Find our test vendor
test_vendor_found = any(s["vendor_code"] == test_vendor.vendor_code for s in data["vendors"])
assert test_vendor_found
def test_get_vendors_with_filters(self, client, auth_headers, test_vendor):
"""Test getting vendors with filtering options"""
# Test active_only filter
response = client.get("/api/v1/vendor ?active_only=true", headers=auth_headers)
assert response.status_code == 200
data = response.json()
for vendor in data["vendors"]:
assert vendor ["is_active"] is True
# Test verified_only filter
response = client.get("/api/v1/vendor ?verified_only=true", headers=auth_headers)
assert response.status_code == 200
# Response should only contain verified vendors
def test_get_vendor_by_code_success(self, client, auth_headers, test_vendor):
"""Test getting specific vendor successfully"""
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["vendor_code"] == test_vendor.vendor_code
assert data["vendor_name"] == test_vendor.vendor_name
def test_get_vendor_by_code_not_found(self, client, auth_headers):
"""Test getting nonexistent vendor returns VendorNotFoundException"""
response = client.get("/api/v1/vendor /NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "VENDOR_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT" in data["message"]
assert data["details"]["resource_type"] == "Vendor"
assert data["details"]["identifier"] == "NONEXISTENT"
def test_get_vendor_unauthorized_access(self, client, auth_headers, test_vendor, other_user, db):
"""Test accessing vendor owned by another user returns UnauthorizedVendorAccessException"""
# Change vendor owner to other user AND make it unverified/inactive
# so that non-owner users cannot access it
test_vendor.owner_id = other_user.id
test_vendor.is_verified = False # Make it not publicly accessible
db.commit()
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}", headers=auth_headers
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_VENDOR_ACCESS"
assert data["status_code"] == 403
assert test_vendor.vendor_code in data["message"]
assert data["details"]["vendor_code"] == test_vendor.vendor_code
def test_get_vendor_unauthorized_access_with_inactive_vendor(self, client, auth_headers, inactive_vendor):
"""Test accessing inactive vendor owned by another user returns UnauthorizedVendorAccessException"""
# inactive_vendor fixture already creates an unverified, inactive vendor owned by other_user
response = client.get(
f"/api/v1/vendor /{inactive_vendor.vendor_code}", headers=auth_headers
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_VENDOR_ACCESS"
assert data["status_code"] == 403
assert inactive_vendor.vendor_code in data["message"]
assert data["details"]["vendor_code"] == inactive_vendor.vendor_code
def test_get_vendor_public_access_allowed(self, client, auth_headers, verified_vendor):
"""Test accessing verified vendor owned by another user is allowed (public access)"""
# verified_vendor fixture creates a verified, active vendor owned by other_user
# This should allow public access per your business logic
response = client.get(
f"/api/v1/vendor /{verified_vendor.vendor_code}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["vendor_code"] == verified_vendor.vendor_code
assert data["vendor_name"] == verified_vendor.vendor_name
def test_add_product_to_vendor_success(self, client, auth_headers, test_vendor, unique_product):
"""Test adding product to vendor successfully"""
product_data = {
"marketplace_product_id": unique_product.marketplace_product_id, # Use string marketplace_product_id, not database id
"price": 29.99,
"is_active": True,
"is_featured": False,
}
response = client.post(
f"/api/v1/vendor /{test_vendor.vendor_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 200
data = response.json()
# The response structure contains nested product data
assert data["vendor_id"] == test_vendor.id
assert data["price"] == 29.99
assert data["is_active"] is True
assert data["is_featured"] is False
# MarketplaceProduct details are nested in the 'marketplace_product' field
assert "marketplace_product" in data
assert data["marketplace_product"]["marketplace_product_id"] == unique_product.marketplace_product_id
assert data["marketplace_product"]["id"] == unique_product.id
def test_add_product_to_vendor_already_exists_conflict(self, client, auth_headers, test_vendor, test_product):
"""Test adding product that already exists in vendor returns ProductAlreadyExistsException"""
# test_product fixture already creates a relationship, get the marketplace_product_id string
existing_product = test_product.marketplace_product
product_data = {
"marketplace_product_id": existing_product.marketplace_product_id, # Use string marketplace_product_id
"price": 29.99,
}
response = client.post(
f"/api/v1/vendor /{test_vendor.vendor_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 409
data = response.json()
assert data["error_code"] == "PRODUCT_ALREADY_EXISTS"
assert data["status_code"] == 409
assert test_vendor.vendor_code in data["message"]
assert existing_product.marketplace_product_id in data["message"]
def test_add_nonexistent_product_to_vendor_not_found(self, client, auth_headers, test_vendor):
"""Test adding nonexistent product to vendor returns MarketplaceProductNotFoundException"""
product_data = {
"marketplace_product_id": "NONEXISTENT_PRODUCT", # Use string marketplace_product_id that doesn't exist
"price": 29.99,
}
response = client.post(
f"/api/v1/vendor /{test_vendor.vendor_code}/products",
headers=auth_headers,
json=product_data
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "PRODUCT_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT_PRODUCT" in data["message"]
def test_get_products_success(self, client, auth_headers, test_vendor, test_product):
"""Test getting vendor products successfully"""
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}/products",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["products"]) >= 1
assert "vendor " in data
assert data["vendor "]["vendor_code"] == test_vendor.vendor_code
def test_get_products_with_filters(self, client, auth_headers, test_vendor):
"""Test getting vendor products with filtering"""
# Test active_only filter
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}/products?active_only=true",
headers=auth_headers
)
assert response.status_code == 200
# Test featured_only filter
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}/products?featured_only=true",
headers=auth_headers
)
assert response.status_code == 200
def test_get_products_from_nonexistent_vendor_not_found(self, client, auth_headers):
"""Test getting products from nonexistent vendor returns VendorNotFoundException"""
response = client.get(
"/api/v1/vendor /NONEXISTENT/products",
headers=auth_headers
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "VENDOR_NOT_FOUND"
assert data["status_code"] == 404
assert "NONEXISTENT" in data["message"]
def test_vendor_not_active_business_logic_error(self, client, auth_headers, test_vendor, db):
"""Test accessing inactive vendor returns VendorNotActiveException (if enforced)"""
# Set vendor to inactive
test_vendor.is_active = False
db.commit()
# Depending on your business logic, this might return an error
response = client.get(
f"/api/v1/vendor /{test_vendor.vendor_code}", headers=auth_headers
)
# If your service enforces active vendor requirement
if response.status_code == 400:
data = response.json()
assert data["error_code"] == "VENDOR_NOT_ACTIVE"
assert data["status_code"] == 400
assert test_vendor.vendor_code in data["message"]
def test_vendor_not_verified_business_logic_error(self, client, auth_headers, test_vendor, db):
"""Test operations requiring verification returns VendorNotVerifiedException (if enforced)"""
# Set vendor to unverified
test_vendor.is_verified = False
db.commit()
# Test adding products (might require verification)
product_data = {
"marketplace_product_id": 1,
"price": 29.99,
}
response = client.post(
f"/api/v1/vendor /{test_vendor.vendor_code}/products",
headers=auth_headers,
json=product_data
)
# If your service requires verification for adding products
if response.status_code == 400:
data = response.json()
assert data["error_code"] == "VENDOR_NOT_VERIFIED"
assert data["status_code"] == 400
assert test_vendor.vendor_code in data["message"]
def test_get_vendor_without_auth_returns_invalid_token(self, client):
"""Test that vendor endpoints require authentication returns InvalidTokenException"""
response = client.get("/api/v1/vendor ")
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
assert data["status_code"] == 401
def test_pagination_validation_errors(self, client, auth_headers):
"""Test pagination parameter validation"""
# Test negative skip
response = client.get("/api/v1/vendor ?skip=-1", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
# Test zero limit
response = client.get("/api/v1/vendor ?limit=0", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
# Test excessive limit
response = client.get("/api/v1/vendor ?limit=10000", headers=auth_headers)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
def test_exception_structure_consistency(self, client, auth_headers):
"""Test that all vendor exceptions follow the consistent LetzShopException structure"""
# Test with a known error case
response = client.get("/api/v1/vendor /NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
data = response.json()
# Verify exception structure matches LetzShopException.to_dict()
required_fields = ["error_code", "message", "status_code"]
for field in required_fields:
assert field in data, f"Missing required field: {field}"
assert isinstance(data["error_code"], str)
assert isinstance(data["message"], str)
assert isinstance(data["status_code"], int)
# Details field should be present for domain-specific exceptions
if "details" in data:
assert isinstance(data["details"], dict)