Tests restructuring

This commit is contained in:
2025-09-19 21:23:57 +02:00
parent d0924f90c4
commit 366093bbc6
70 changed files with 212 additions and 1957 deletions

BIN
tests/.coverage Normal file

Binary file not shown.

View File

@@ -1,6 +1,4 @@
# tests/conftest.py
import uuid
# tests/conftest.py - Updated main conftest with core fixtures only
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
@@ -9,7 +7,6 @@ from sqlalchemy.pool import StaticPool
from app.core.database import Base, get_db
from main import app
from middleware.auth import AuthManager
# Import all models to ensure they're registered with Base metadata
from models.database_models import (MarketplaceImportJob, Product, Shop,
ShopProduct, Stock, User)
@@ -54,7 +51,7 @@ def db(engine, testing_session_local):
@pytest.fixture(scope="function")
def client(db): # Now client depends on db
def client(db):
"""Create a test client with database dependency override"""
# Override the dependency to use our test database
@@ -75,172 +72,6 @@ def client(db): # Now client depends on db
del app.dependency_overrides[get_db]
@pytest.fixture(scope="session")
def auth_manager():
"""Create auth manager instance (session scope since it's stateless)"""
return AuthManager()
@pytest.fixture
def test_user(db, auth_manager):
"""Create a test user with unique username"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
hashed_password = auth_manager.hash_password("testpass123")
user = User(
email=f"test_{unique_id}@example.com",
username=f"testuser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def test_admin(db, auth_manager):
"""Create a test admin user with unique username"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
hashed_password = auth_manager.hash_password("adminpass123")
admin = User(
email=f"admin_{unique_id}@example.com",
username=f"admin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True,
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def auth_headers(client, test_user):
"""Get authentication headers for test user"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200, f"Login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def admin_headers(client, test_admin):
"""Get authentication headers for admin user"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_admin.username, "password": "adminpass123"},
)
assert response.status_code == 200, f"Admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def test_product(db):
"""Create a test product"""
product = Product(
product_id="TEST001",
title="Test Product",
description="A test product",
price="10.99",
currency="EUR",
brand="TestBrand",
gtin="1234567890123",
availability="in stock",
marketplace="Letzshop",
shop_name="TestShop",
)
db.add(product)
db.commit()
db.refresh(product)
return product
@pytest.fixture
def test_shop(db, test_user):
"""Create a test shop with unique shop code"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
shop = Shop(
shop_code=f"TESTSHOP_{unique_id}",
shop_name=f"Test Shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def test_stock(db, test_product, test_shop):
"""Create test stock entry"""
unique_id = str(uuid.uuid4())[:8].upper() # Short unique identifier
stock = Stock(
gtin=test_product.gtin, # Fixed: use gtin instead of product_id
location=f"WAREHOUSE_A_{unique_id}",
quantity=10,
reserved_quantity=0,
shop_id=test_shop.id, # Add shop_id reference
)
db.add(stock)
db.commit()
db.refresh(stock)
return stock
@pytest.fixture
def test_marketplace_job(db, test_shop, test_user): # Add test_shop dependency
"""Create a test marketplace import job"""
job = MarketplaceImportJob(
marketplace="amazon",
shop_name="Test Import Shop",
status="completed",
source_url="https://test-marketplace.example.com/import",
shop_id=test_shop.id, # Add required shop_id
user_id=test_user.id,
imported_count=5,
updated_count=3,
total_processed=8,
error_count=0,
error_message=None,
)
db.add(job)
db.commit()
db.refresh(job)
return job
def create_test_import_job(db, shop_id, **kwargs): # Add shop_id parameter
"""Helper function to create MarketplaceImportJob with defaults"""
defaults = {
"marketplace": "test",
"shop_name": "Test Shop",
"status": "pending",
"source_url": "https://test.example.com/import",
"shop_id": shop_id, # Add required shop_id
"imported_count": 0,
"updated_count": 0,
"total_processed": 0,
"error_count": 0,
"error_message": None,
}
defaults.update(kwargs)
job = MarketplaceImportJob(**defaults)
db.add(job)
db.commit()
db.refresh(job)
return job
# Cleanup fixture to ensure clean state
@pytest.fixture(autouse=True)
def cleanup():
@@ -250,227 +81,10 @@ def cleanup():
app.dependency_overrides.clear()
# Add these fixtures to your existing conftest.py
@pytest.fixture
def unique_product(db):
"""Create a unique product for tests that need isolated product data"""
unique_id = str(uuid.uuid4())[:8]
product = Product(
product_id=f"UNIQUE_{unique_id}",
title=f"Unique Product {unique_id}",
description=f"A unique test product {unique_id}",
price="19.99",
currency="EUR",
brand=f"UniqueBrand_{unique_id}",
gtin=f"123456789{unique_id[:4]}",
availability="in stock",
marketplace="Letzshop",
shop_name=f"UniqueShop_{unique_id}",
google_product_category=f"UniqueCategory_{unique_id}",
)
db.add(product)
db.commit()
db.refresh(product)
return product
@pytest.fixture
def unique_shop(db, test_user):
"""Create a unique shop for tests that need isolated shop data"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"UNIQUESHOP_{unique_id}",
shop_name=f"Unique Test Shop {unique_id}",
description=f"A unique test shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def other_user(db, auth_manager):
"""Create a different user for testing access controls"""
unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("otherpass123")
user = User(
email=f"other_{unique_id}@example.com",
username=f"otheruser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def inactive_shop(db, other_user):
"""Create an inactive shop owned by other_user"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"INACTIVE_{unique_id}",
shop_name=f"Inactive Shop {unique_id}",
owner_id=other_user.id,
is_active=False,
is_verified=False,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def verified_shop(db, other_user):
"""Create a verified shop owned by other_user"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"VERIFIED_{unique_id}",
shop_name=f"Verified Shop {unique_id}",
owner_id=other_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def shop_product(db, test_shop, unique_product):
"""Create a shop product relationship"""
shop_product = ShopProduct(
shop_id=test_shop.id, product_id=unique_product.id, is_active=True
)
# Add optional fields if they exist in your model
if hasattr(ShopProduct, "price"):
shop_product.price = "24.99"
if hasattr(ShopProduct, "is_featured"):
shop_product.is_featured = False
if hasattr(ShopProduct, "stock_quantity"):
shop_product.stock_quantity = 10
db.add(shop_product)
db.commit()
db.refresh(shop_product)
return shop_product
@pytest.fixture
def multiple_products(db):
"""Create multiple products for testing statistics and pagination"""
unique_id = str(uuid.uuid4())[:8]
products = []
for i in range(5):
product = Product(
product_id=f"MULTI_{unique_id}_{i}",
title=f"Multi Product {i} {unique_id}",
description=f"Multi test product {i}",
price=f"{10 + i}.99",
currency="EUR",
brand=f"MultiBrand_{i % 3}", # Create 3 different brands
marketplace=f"MultiMarket_{i % 2}", # Create 2 different marketplaces
shop_name=f"MultiShop_{i}",
google_product_category=f"MultiCategory_{i % 2}", # Create 2 different categories
gtin=f"1234567890{i}{unique_id[:2]}",
)
products.append(product)
db.add_all(products)
db.commit()
for product in products:
db.refresh(product)
return products
@pytest.fixture
def multiple_stocks(db, multiple_products, test_shop):
"""Create multiple stock entries for testing"""
stocks = []
for i, product in enumerate(multiple_products):
stock = Stock(
gtin=product.gtin,
location=f"LOC_{i}",
quantity=10 + (i * 5), # Different quantities
reserved_quantity=i,
shop_id=test_shop.id,
)
stocks.append(stock)
db.add_all(stocks)
db.commit()
for stock in stocks:
db.refresh(stock)
return stocks
# Helper fixture factory functions
def create_unique_product_factory():
"""Factory function to create unique products in tests"""
def _create_product(db, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
"product_id": f"FACTORY_{unique_id}",
"title": f"Factory Product {unique_id}",
"price": "15.99",
"currency": "EUR",
"marketplace": "TestMarket",
"shop_name": "TestShop",
}
defaults.update(kwargs)
product = Product(**defaults)
db.add(product)
db.commit()
db.refresh(product)
return product
return _create_product
@pytest.fixture
def product_factory():
"""Fixture that provides a product factory function"""
return create_unique_product_factory()
def create_unique_shop_factory():
"""Factory function to create unique shops in tests"""
def _create_shop(db, owner_id, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
"shop_code": f"FACTORY_{unique_id}",
"shop_name": f"Factory Shop {unique_id}",
"owner_id": owner_id,
"is_active": True,
"is_verified": False,
}
defaults.update(kwargs)
shop = Shop(**defaults)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
return _create_shop
@pytest.fixture
def shop_factory():
"""Fixture that provides a shop factory function"""
return create_unique_shop_factory()
# Import fixtures from fixture modules
pytest_plugins = [
"tests.fixtures.auth_fixtures",
"tests.fixtures.product_fixtures",
"tests.fixtures.shop_fixtures",
"tests.fixtures.marketplace_fixtures",
]

0
tests/ecommerce.db Normal file
View File

3
tests/fixtures/__init__.py vendored Normal file
View File

@@ -0,0 +1,3 @@
# tests/fixtures/__init__.py
"""Test fixtures for the FastAPI application test suite."""

91
tests/fixtures/auth_fixtures.py vendored Normal file
View File

@@ -0,0 +1,91 @@
# tests/fixtures/auth_fixtures.py
import uuid
import pytest
from middleware.auth import AuthManager
from models.database_models import User
@pytest.fixture(scope="session")
def auth_manager():
"""Create auth manager instance (session scope since it's stateless)"""
return AuthManager()
@pytest.fixture
def test_user(db, auth_manager):
"""Create a test user with unique username"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
hashed_password = auth_manager.hash_password("testpass123")
user = User(
email=f"test_{unique_id}@example.com",
username=f"testuser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def test_admin(db, auth_manager):
"""Create a test admin user with unique username"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
hashed_password = auth_manager.hash_password("adminpass123")
admin = User(
email=f"admin_{unique_id}@example.com",
username=f"admin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True,
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def other_user(db, auth_manager):
"""Create a different user for testing access controls"""
unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("otherpass123")
user = User(
email=f"other_{unique_id}@example.com",
username=f"otheruser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def auth_headers(client, test_user):
"""Get authentication headers for test user"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200, f"Login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def admin_headers(client, test_admin):
"""Get authentication headers for admin user"""
response = client.post(
"/api/v1/auth/login",
json={"username": test_admin.username, "password": "adminpass123"},
)
assert response.status_code == 200, f"Admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}

50
tests/fixtures/marketplace_fixtures.py vendored Normal file
View File

@@ -0,0 +1,50 @@
# tests/fixtures/marketplace_fixtures.py
import pytest
from models.database_models import MarketplaceImportJob
@pytest.fixture
def test_marketplace_job(db, test_shop, test_user):
"""Create a test marketplace import job"""
job = MarketplaceImportJob(
marketplace="amazon",
shop_name="Test Import Shop",
status="completed",
source_url="https://test-marketplace.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id,
imported_count=5,
updated_count=3,
total_processed=8,
error_count=0,
error_message=None,
)
db.add(job)
db.commit()
db.refresh(job)
return job
def create_test_import_job(db, shop_id, user_id, **kwargs):
"""Helper function to create MarketplaceImportJob with defaults"""
defaults = {
"marketplace": "test",
"shop_name": "Test Shop",
"status": "pending",
"source_url": "https://test.example.com/import",
"shop_id": shop_id,
"user_id": user_id,
"imported_count": 0,
"updated_count": 0,
"total_processed": 0,
"error_count": 0,
"error_message": None,
}
defaults.update(kwargs)
job = MarketplaceImportJob(**defaults)
db.add(job)
db.commit()
db.refresh(job)
return job

108
tests/fixtures/product_fixtures.py vendored Normal file
View File

@@ -0,0 +1,108 @@
# tests/fixtures/product_fixtures.py
import uuid
import pytest
from models.database_models import Product
@pytest.fixture
def test_product(db):
"""Create a test product"""
product = Product(
product_id="TEST001",
title="Test Product",
description="A test product",
price="10.99",
currency="EUR",
brand="TestBrand",
gtin="1234567890123",
availability="in stock",
marketplace="Letzshop",
shop_name="TestShop",
)
db.add(product)
db.commit()
db.refresh(product)
return product
@pytest.fixture
def unique_product(db):
"""Create a unique product for tests that need isolated product data"""
unique_id = str(uuid.uuid4())[:8]
product = Product(
product_id=f"UNIQUE_{unique_id}",
title=f"Unique Product {unique_id}",
description=f"A unique test product {unique_id}",
price="19.99",
currency="EUR",
brand=f"UniqueBrand_{unique_id}",
gtin=f"123456789{unique_id[:4]}",
availability="in stock",
marketplace="Letzshop",
shop_name=f"UniqueShop_{unique_id}",
google_product_category=f"UniqueCategory_{unique_id}",
)
db.add(product)
db.commit()
db.refresh(product)
return product
@pytest.fixture
def multiple_products(db):
"""Create multiple products for testing statistics and pagination"""
unique_id = str(uuid.uuid4())[:8]
products = []
for i in range(5):
product = Product(
product_id=f"MULTI_{unique_id}_{i}",
title=f"Multi Product {i} {unique_id}",
description=f"Multi test product {i}",
price=f"{10 + i}.99",
currency="EUR",
brand=f"MultiBrand_{i % 3}", # Create 3 different brands
marketplace=f"MultiMarket_{i % 2}", # Create 2 different marketplaces
shop_name=f"MultiShop_{i}",
google_product_category=f"MultiCategory_{i % 2}", # Create 2 different categories
gtin=f"1234567890{i}{unique_id[:2]}",
)
products.append(product)
db.add_all(products)
db.commit()
for product in products:
db.refresh(product)
return products
def create_unique_product_factory():
"""Factory function to create unique products in tests"""
def _create_product(db, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
"product_id": f"FACTORY_{unique_id}",
"title": f"Factory Product {unique_id}",
"price": "15.99",
"currency": "EUR",
"marketplace": "TestMarket",
"shop_name": "TestShop",
}
defaults.update(kwargs)
product = Product(**defaults)
db.add(product)
db.commit()
db.refresh(product)
return product
return _create_product
@pytest.fixture
def product_factory():
"""Fixture that provides a product factory function"""
return create_unique_product_factory()

163
tests/fixtures/shop_fixtures.py vendored Normal file
View File

@@ -0,0 +1,163 @@
# tests/fixtures/shop_fixtures.py
import uuid
import pytest
from models.database_models import Shop, ShopProduct, Stock
@pytest.fixture
def test_shop(db, test_user):
"""Create a test shop with unique shop code"""
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
shop = Shop(
shop_code=f"TESTSHOP_{unique_id}",
shop_name=f"Test Shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def unique_shop(db, test_user):
"""Create a unique shop for tests that need isolated shop data"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"UNIQUESHOP_{unique_id}",
shop_name=f"Unique Test Shop {unique_id}",
description=f"A unique test shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def inactive_shop(db, other_user):
"""Create an inactive shop owned by other_user"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"INACTIVE_{unique_id}",
shop_name=f"Inactive Shop {unique_id}",
owner_id=other_user.id,
is_active=False,
is_verified=False,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def verified_shop(db, other_user):
"""Create a verified shop owned by other_user"""
unique_id = str(uuid.uuid4())[:8]
shop = Shop(
shop_code=f"VERIFIED_{unique_id}",
shop_name=f"Verified Shop {unique_id}",
owner_id=other_user.id,
is_active=True,
is_verified=True,
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def shop_product(db, test_shop, unique_product):
"""Create a shop product relationship"""
shop_product = ShopProduct(
shop_id=test_shop.id, product_id=unique_product.id, is_active=True
)
# Add optional fields if they exist in your model
if hasattr(ShopProduct, "shop_price"):
shop_product.shop_price = 24.99
if hasattr(ShopProduct, "is_featured"):
shop_product.is_featured = False
if hasattr(ShopProduct, "min_quantity"):
shop_product.min_quantity = 1
db.add(shop_product)
db.commit()
db.refresh(shop_product)
return shop_product
@pytest.fixture
def test_stock(db, test_product, test_shop):
"""Create test stock entry"""
unique_id = str(uuid.uuid4())[:8].upper() # Short unique identifier
stock = Stock(
gtin=test_product.gtin, # Use gtin instead of product_id
location=f"WAREHOUSE_A_{unique_id}",
quantity=10,
reserved_quantity=0,
shop_id=test_shop.id, # Add shop_id reference
)
db.add(stock)
db.commit()
db.refresh(stock)
return stock
@pytest.fixture
def multiple_stocks(db, multiple_products, test_shop):
"""Create multiple stock entries for testing"""
stocks = []
for i, product in enumerate(multiple_products):
stock = Stock(
gtin=product.gtin,
location=f"LOC_{i}",
quantity=10 + (i * 5), # Different quantities
reserved_quantity=i,
shop_id=test_shop.id,
)
stocks.append(stock)
db.add_all(stocks)
db.commit()
for stock in stocks:
db.refresh(stock)
return stocks
def create_unique_shop_factory():
"""Factory function to create unique shops in tests"""
def _create_shop(db, owner_id, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
"shop_code": f"FACTORY_{unique_id}",
"shop_name": f"Factory Shop {unique_id}",
"owner_id": owner_id,
"is_active": True,
"is_verified": False,
}
defaults.update(kwargs)
shop = Shop(**defaults)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
return _create_shop
@pytest.fixture
def shop_factory():
"""Fixture that provides a shop factory function"""
return create_unique_shop_factory()

View File

@@ -0,0 +1,3 @@
# tests/integration/__init__.py
"""Integration tests - multiple components working together."""

View File

@@ -0,0 +1,3 @@
# tests/integration/api/__init__.py
"""API integration tests."""

View File

@@ -0,0 +1,3 @@
# tests/integration/api/v1/__init__.py
"""API v1 endpoint integration tests."""

View File

@@ -1,7 +1,10 @@
# tests/test_admin.py
# tests/integration/api/v1/test_admin_endpoints.py
import pytest
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminAPI:
def test_get_all_users_admin(self, client, admin_headers, test_user):
"""Test admin getting all users"""
@@ -21,8 +24,8 @@ class TestAdminAPI:
assert response.status_code == 403
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_toggle_user_status_admin(self, client, admin_headers, test_user):
@@ -45,7 +48,7 @@ class TestAdminAPI:
assert "User not found" in response.json()["detail"]
def test_toggle_user_status_cannot_deactivate_self(
self, client, admin_headers, test_admin
self, client, admin_headers, test_admin
):
"""Test that admin cannot deactivate their own account"""
response = client.put(
@@ -76,8 +79,8 @@ class TestAdminAPI:
assert response.status_code == 403
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_verify_shop_admin(self, client, admin_headers, test_shop):
@@ -117,7 +120,7 @@ class TestAdminAPI:
assert "Shop not found" in response.json()["detail"]
def test_get_marketplace_import_jobs_admin(
self, client, admin_headers, test_marketplace_job
self, client, admin_headers, test_marketplace_job
):
"""Test admin getting marketplace import jobs"""
response = client.get(
@@ -133,7 +136,7 @@ class TestAdminAPI:
assert test_marketplace_job.id in job_ids
def test_get_marketplace_import_jobs_with_filters(
self, client, admin_headers, test_marketplace_job
self, client, admin_headers, test_marketplace_job
):
"""Test admin getting marketplace import jobs with filters"""
response = client.get(
@@ -157,8 +160,8 @@ class TestAdminAPI:
assert response.status_code == 403
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_admin_pagination_users(self, client, admin_headers, test_user, test_admin):

View File

@@ -1,4 +1,4 @@
# tests/test_auth.py
# tests/test_authentication_endpoints.py
import pytest
from fastapi import HTTPException

View File

@@ -1,4 +1,4 @@
# tests/test_marketplace.py
# tests/test_marketplace_endpoints.py
from unittest.mock import AsyncMock, patch
import pytest

View File

@@ -1,9 +1,14 @@
# tests/test_pagination.py
# tests/integration/api/v1/test_pagination.py
import pytest
from models.database_models import Product
from models.database_models import Product, Shop
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.database
@pytest.mark.products
@pytest.mark.shops
class TestPagination:
def test_product_pagination(self, client, auth_headers, db):
"""Test pagination for product listing"""
@@ -55,3 +60,28 @@ class TestPagination:
# Test excessive limit
response = client.get("/api/v1/product?limit=10000", headers=auth_headers)
assert response.status_code == 422 # Should be limited
def test_shop_pagination(self, client, admin_headers, db, test_user):
"""Test pagination for shop listing"""
# Create multiple shops for pagination testing
shops = []
for i in range(15):
shop = Shop(
shop_code=f"PAGESHOP{i:03d}",
shop_name=f"Pagination Shop {i}",
owner_id=test_user.id,
is_active=True,
)
shops.append(shop)
db.add_all(shops)
db.commit()
# Test first page
response = client.get("/api/v1/admin/shops?limit=5&skip=0", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert len(data["shops"]) == 5
assert data["total"] >= 15 # At least our test shops
assert data["skip"] == 0
assert data["limit"] == 5

View File

@@ -1,4 +1,4 @@
# tests/test_product.py
# tests/test_product_endpoints.py
import pytest

View File

@@ -1,4 +1,4 @@
# tests/test_shop.py
# tests/test_shop_endpoints.py
import pytest

View File

@@ -1,4 +1,4 @@
# tests/test_stats.py
# tests/test_stats_endpoints.py
import pytest

View File

@@ -1,4 +1,4 @@
# tests/test_stock.py
# tests/test_stock_endpoints.py
import pytest
from models.database_models import Stock

View File

@@ -0,0 +1,6 @@
# tests/integration/conftest.py
"""Integration test specific fixtures."""
import pytest
# Add any integration-specific fixtures here if needed

View File

@@ -0,0 +1,3 @@
# tests/integration/security/__init__.py
"""Security integration tests."""

View File

@@ -1,14 +1,36 @@
# tests/test_security.py
from unittest.mock import patch
# tests/integration/security/test_authentication.py
import pytest
from fastapi import HTTPException
class TestSecurity:
@pytest.mark.integration
@pytest.mark.security
@pytest.mark.auth
class TestAuthentication:
def test_protected_endpoint_without_auth(self, client):
"""Test that protected endpoints reject unauthenticated requests"""
protected_endpoints = [
"/api/v1/admin/users",
"/api/v1/admin/shops",
"/api/v1/marketplace/import-jobs",
"/api/v1/product",
"/api/v1/shop",
"/api/v1/stats",
"/api/v1/stock",
]
for endpoint in protected_endpoints:
response = client.get(endpoint)
assert response.status_code == 401 # Authentication missing
def test_protected_endpoint_with_invalid_token(self, client):
"""Test protected endpoints with invalid token"""
headers = {"Authorization": "Bearer invalid_token_here"}
response = client.get("/api/v1/product", headers=headers)
assert response.status_code == 401 # Token is not valid
def test_debug_direct_bearer(self, client):
"""Test HTTPBearer directly"""
response = client.get("/api/v1/debug-bearer")
print(f"Direct Bearer - Status: {response.status_code}")
print(
@@ -17,7 +39,6 @@ class TestSecurity:
def test_debug_dependencies(self, client):
"""Debug the dependency chain step by step"""
# Test 1: Direct endpoint with no auth
response = client.get("/api/v1/admin/users")
print(f"Admin endpoint - Status: {response.status_code}")
@@ -27,9 +48,7 @@ class TestSecurity:
print(f"Admin endpoint - Raw: {response.content}")
# Test 2: Try a regular endpoint that uses get_current_user
response2 = client.get(
"/api/v1/product"
) # or any endpoint with get_current_user
response2 = client.get("/api/v1/product") # or any endpoint with get_current_user
print(f"Regular endpoint - Status: {response2.status_code}")
try:
print(f"Regular endpoint - Response: {response2.json()}")
@@ -54,66 +73,3 @@ class TestSecurity:
for path in variations:
response = client.get(path)
print(f"{path}: Status {response.status_code}")
def test_protected_endpoint_without_auth(self, client):
"""Test that protected endpoints reject unauthenticated requests"""
protected_endpoints = [
"/api/v1/admin/users",
"/api/v1/admin/shops",
"/api/v1/marketplace/import-jobs",
"/api/v1/product",
"/api/v1/shop",
"/api/v1/stats",
"/api/v1/stock",
]
for endpoint in protected_endpoints:
response = client.get(endpoint)
assert response.status_code == 401 # Authentication missing
def test_protected_endpoint_with_invalid_token(self, client):
"""Test protected endpoints with invalid token"""
headers = {"Authorization": "Bearer invalid_token_here"}
response = client.get("/api/v1/product", headers=headers)
assert response.status_code == 401 # Token is not valid
def test_admin_endpoint_requires_admin_role(self, client, auth_headers):
"""Test that admin endpoints require admin role"""
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert (
response.status_code == 403
) # Token is valid but user does not have access.
# Regular user should be denied
def test_sql_injection_prevention(self, client, auth_headers):
"""Test SQL injection prevention in search parameters"""
# Try SQL injection in search parameter
malicious_search = "'; DROP TABLE products; --"
response = client.get(
f"/api/v1/product?search={malicious_search}", headers=auth_headers
)
# Should not crash and should return normal response
assert response.status_code == 200
# Database should still be intact (no products dropped)
# def test_input_validation(self, client, auth_headers):
# # TODO: implement sanitization
# """Test input validation and sanitization"""
# # Test XSS attempt in product creation
# xss_payload = "<script>alert('xss')</script>"
#
# product_data = {
# "product_id": "XSS_TEST",
# "title": xss_payload,
# "description": xss_payload,
# }
#
# response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
#
# assert response.status_code == 200
# data = response.json()
# assert "<script>" not in data["title"]
# assert "&lt;script&gt;" in data["title"]

View File

@@ -0,0 +1,46 @@
# tests/integration/security/test_authorization.py
import pytest
@pytest.mark.integration
@pytest.mark.security
@pytest.mark.auth
class TestAuthorization:
def test_admin_endpoint_requires_admin_role(self, client, auth_headers):
"""Test that admin endpoints require admin role"""
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert response.status_code == 403
# Regular user should be denied access
def test_admin_endpoints_with_admin_access(self, client, admin_headers):
"""Test that admin users can access admin endpoints"""
admin_endpoints = [
"/api/v1/admin/users",
"/api/v1/admin/shops",
"/api/v1/admin/marketplace-import-jobs",
]
for endpoint in admin_endpoints:
response = client.get(endpoint, headers=admin_headers)
assert response.status_code == 200 # Admin should have access
def test_regular_endpoints_with_user_access(self, client, auth_headers):
"""Test that regular users can access non-admin endpoints"""
user_endpoints = [
"/api/v1/product",
"/api/v1/stats",
"/api/v1/stock",
]
for endpoint in user_endpoints:
response = client.get(endpoint, headers=auth_headers)
assert response.status_code == 200 # Regular user should have access
def test_shop_owner_access_control(self, client, auth_headers, test_shop, other_user):
"""Test that users can only access their own shops"""
# Test accessing own shop (should work)
response = client.get(f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers)
# Response depends on your implementation - could be 200 or 404 if shop doesn't belong to user
# The exact assertion depends on your shop access control implementation
assert response.status_code in [200, 403, 404]

View File

@@ -0,0 +1,65 @@
# tests/integration/security/test_input_validation.py
import pytest
@pytest.mark.integration
@pytest.mark.security
class TestInputValidation:
def test_sql_injection_prevention(self, client, auth_headers):
"""Test SQL injection prevention in search parameters"""
# Try SQL injection in search parameter
malicious_search = "'; DROP TABLE products; --"
response = client.get(
f"/api/v1/product?search={malicious_search}", headers=auth_headers
)
# Should not crash and should return normal response
assert response.status_code == 200
# Database should still be intact (no products dropped)
# def test_input_validation(self, client, auth_headers):
# # TODO: implement sanitization
# """Test input validation and sanitization"""
# # Test XSS attempt in product creation
# xss_payload = "<script>alert('xss')</script>"
#
# product_data = {
# "product_id": "XSS_TEST",
# "title": xss_payload,
# "description": xss_payload,
# }
#
# response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
#
# assert response.status_code == 200
# data = response.json()
# assert "<script>" not in data["title"]
# assert "&lt;script&gt;" in data["title"]
def test_parameter_validation(self, client, auth_headers):
"""Test parameter validation for API endpoints"""
# Test invalid pagination parameters
response = client.get("/api/v1/product?limit=-1", headers=auth_headers)
assert response.status_code == 422 # Validation error
response = client.get("/api/v1/product?skip=-1", headers=auth_headers)
assert response.status_code == 422 # Validation error
def test_json_validation(self, client, auth_headers):
"""Test JSON validation for POST requests"""
# Test invalid JSON structure
response = client.post(
"/api/v1/product",
headers=auth_headers,
content="invalid json content"
)
assert response.status_code == 422 # JSON decode error
# Test missing required fields
response = client.post(
"/api/v1/product",
headers=auth_headers,
json={"title": "Test Product"} # Missing required product_id
)
assert response.status_code == 422 # Validation error

View File

@@ -0,0 +1,3 @@
# tests/performance/__init__.py
"""Performance and load tests."""

View File

@@ -0,0 +1,10 @@
# tests/performance/conftest.py
"""Performance test specific fixtures."""
import pytest
@pytest.fixture
def performance_db_session(db):
"""Database session optimized for performance testing"""
# You can add performance-specific DB configurations here
return db

View File

@@ -0,0 +1,123 @@
# tests/performance/test_api_performance.py
import time
import pytest
from models.database_models import Product
@pytest.mark.performance
@pytest.mark.slow
@pytest.mark.database
class TestPerformance:
def test_product_list_performance(self, client, auth_headers, db):
"""Test performance of product listing with many products"""
# Create multiple products
products = []
for i in range(100):
product = Product(
product_id=f"PERF{i:03d}",
title=f"Performance Test Product {i}",
price=f"{i}.99",
marketplace="Performance",
)
products.append(product)
db.add_all(products)
db.commit()
# Time the request
start_time = time.time()
response = client.get("/api/v1/product?limit=100", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert len(response.json()["products"]) == 100
assert end_time - start_time < 2.0 # Should complete within 2 seconds
def test_search_performance(self, client, auth_headers, db):
"""Test search performance"""
# Create products with searchable content
products = []
for i in range(50):
product = Product(
product_id=f"SEARCH{i:03d}",
title=f"Searchable Product {i}",
description=f"This is a searchable product number {i}",
brand="SearchBrand",
marketplace="SearchMarket",
)
products.append(product)
db.add_all(products)
db.commit()
# Time search request
start_time = time.time()
response = client.get("/api/v1/product?search=Searchable", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert response.json()["total"] == 50
assert end_time - start_time < 1.0 # Search should be fast
def test_database_query_performance(self, client, auth_headers, db):
"""Test database query performance with complex filters"""
# Create products with various attributes for filtering
products = []
brands = ["Brand1", "Brand2", "Brand3"]
marketplaces = ["Market1", "Market2"]
for i in range(200):
product = Product(
product_id=f"COMPLEX{i:03d}",
title=f"Complex Product {i}",
brand=brands[i % 3],
marketplace=marketplaces[i % 2],
price=f"{10 + (i % 50)}.99",
google_product_category=f"Category{i % 5}",
)
products.append(product)
db.add_all(products)
db.commit()
# Test complex filtering performance
start_time = time.time()
response = client.get(
"/api/v1/product?brand=Brand1&marketplace=Market1&limit=50",
headers=auth_headers
)
end_time = time.time()
assert response.status_code == 200
assert end_time - start_time < 1.5 # Complex query should still be reasonably fast
def test_pagination_performance_large_dataset(self, client, auth_headers, db):
"""Test pagination performance with large dataset"""
# Create a large dataset
products = []
for i in range(500):
product = Product(
product_id=f"LARGE{i:04d}",
title=f"Large Dataset Product {i}",
marketplace="LargeTest",
)
products.append(product)
db.add_all(products)
db.commit()
# Test pagination performance at different offsets
offsets = [0, 100, 250, 400]
for offset in offsets:
start_time = time.time()
response = client.get(
f"/api/v1/product?skip={offset}&limit=20",
headers=auth_headers
)
end_time = time.time()
assert response.status_code == 200
assert len(response.json()["products"]) == 20
assert end_time - start_time < 1.0 # Pagination should be fast regardless of offset

View File

@@ -1,21 +0,0 @@
# tests/pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
--color=yes
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
auth: marks tests related to authentication
products: marks tests related to products
stock: marks tests related to stock management
shops: marks tests related to shop management
admin: marks tests related to admin functionality

View File

@@ -7,3 +7,4 @@ pytest-mock>=3.11.0
httpx>=0.24.0
faker>=19.0.0
pytest-repeat>=0.9.4
pytest-timeout>=2.1.0

3
tests/system/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# tests/system/__init__.py
"""System-level tests - full application behavior."""

6
tests/system/conftest.py Normal file
View File

@@ -0,0 +1,6 @@
# tests/system/conftest.py
"""System test specific fixtures."""
import pytest
# Add any system-specific fixtures here if needed

View File

@@ -0,0 +1,113 @@
# tests/system/test_error_handling.py
import pytest
@pytest.mark.system
class TestErrorHandling:
def test_invalid_json(self, client, auth_headers):
"""Test handling of invalid JSON"""
response = client.post(
"/api/v1/product", headers=auth_headers, content="invalid json"
)
assert response.status_code == 422 # Validation error
def test_missing_required_fields(self, client, auth_headers):
"""Test handling of missing required fields"""
response = client.post(
"/api/v1/product", headers=auth_headers, json={"title": "Test"}
) # Missing product_id
assert response.status_code == 422
def test_invalid_authentication(self, client):
"""Test handling of invalid authentication"""
response = client.get(
"/api/v1/product", headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401 # Token is not valid
def test_nonexistent_resource(self, client, auth_headers):
"""Test handling of nonexistent resource access"""
response = client.get("/api/v1/product/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
response = client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
def test_duplicate_resource_creation(self, client, auth_headers, test_product):
"""Test handling of duplicate resource creation"""
product_data = {
"product_id": test_product.product_id, # Duplicate ID
"title": "Another Product",
}
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 400
def test_server_error_handling(self, client, auth_headers):
"""Test handling of server errors"""
# This would test 500 errors if you have endpoints that can trigger them
# For now, test that the error handling middleware works
response = client.get("/api/v1/nonexistent-endpoint", headers=auth_headers)
assert response.status_code == 404
def test_rate_limiting_behavior(self, client, auth_headers):
"""Test rate limiting behavior if implemented"""
# Make multiple rapid requests to test rate limiting
responses = []
for i in range(10):
response = client.get("/api/v1/product", headers=auth_headers)
responses.append(response)
# All should succeed unless rate limiting is very aggressive
# Adjust based on your rate limiting configuration
success_count = sum(1 for r in responses if r.status_code == 200)
assert success_count >= 5 # At least half should succeed
def test_malformed_requests(self, client, auth_headers):
"""Test handling of various malformed requests"""
# Test extremely long URLs
long_search = "x" * 10000
response = client.get(f"/api/v1/product?search={long_search}", headers=auth_headers)
# Should handle gracefully, either 200 with no results or 422 for too long
assert response.status_code in [200, 422]
# Test special characters in parameters
special_chars = "!@#$%^&*(){}[]|\\:;\"'<>,.?/~`"
response = client.get(f"/api/v1/product?search={special_chars}", headers=auth_headers)
# Should handle gracefully
assert response.status_code in [200, 422]
def test_database_error_recovery(self, client, auth_headers):
"""Test application behavior during database issues"""
# This is more complex to test - you'd need to simulate DB issues
# For now, just test that basic operations work
response = client.get("/api/v1/product", headers=auth_headers)
assert response.status_code == 200
def test_content_type_errors(self, client, auth_headers):
"""Test handling of incorrect content types"""
# Send XML to JSON endpoint
response = client.post(
"/api/v1/product",
headers={**auth_headers, "Content-Type": "application/xml"},
content="<xml>not json</xml>"
)
assert response.status_code in [400, 422, 415] # Bad request or unsupported media type
def test_large_payload_handling(self, client, auth_headers):
"""Test handling of unusually large payloads"""
# Create a very large product description
large_data = {
"product_id": "LARGE_TEST",
"title": "Large Test Product",
"description": "x" * 50000 # Very long description
}
response = client.post("/api/v1/product", headers=auth_headers, json=large_data)
# Should either accept it or reject with 422 (too large)
assert response.status_code in [200, 201, 422, 413]

View File

@@ -1,388 +0,0 @@
# tests/test_admin_service.py
from datetime import datetime
import pytest
from fastapi import HTTPException
from app.services.admin_service import AdminService
from models.database_models import MarketplaceImportJob, Shop, User
class TestAdminService:
"""Test suite for AdminService following the application's testing patterns"""
def setup_method(self):
"""Setup method following the same pattern as product service tests"""
self.service = AdminService()
def test_get_all_users(self, db, test_user, test_admin):
"""Test getting all users with pagination"""
users = self.service.get_all_users(db, skip=0, limit=10)
assert len(users) >= 2 # test_user + test_admin
user_ids = [user.id for user in users]
assert test_user.id in user_ids
assert test_admin.id in user_ids
def test_get_all_users_with_pagination(self, db, test_user, test_admin):
"""Test user pagination works correctly"""
users = self.service.get_all_users(db, skip=0, limit=1)
assert len(users) == 1
users_second_page = self.service.get_all_users(db, skip=1, limit=1)
assert len(users_second_page) == 1
assert users[0].id != users_second_page[0].id
def test_toggle_user_status_deactivate(self, db, test_user, test_admin):
"""Test deactivating a user"""
assert test_user.is_active is True
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is False
assert f"{user.username} has been deactivated" in message
def test_toggle_user_status_activate(self, db, test_user, test_admin):
"""Test activating a user"""
# First deactivate the user
test_user.is_active = False
db.commit()
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is True
assert f"{user.username} has been activated" in message
def test_toggle_user_status_user_not_found(self, db, test_admin):
"""Test toggle user status when user not found"""
with pytest.raises(HTTPException) as exc_info:
self.service.toggle_user_status(db, 99999, test_admin.id)
assert exc_info.value.status_code == 404
assert "User not found" in str(exc_info.value.detail)
def test_toggle_user_status_cannot_deactivate_self(self, db, test_admin):
"""Test that admin cannot deactivate their own account"""
with pytest.raises(HTTPException) as exc_info:
self.service.toggle_user_status(db, test_admin.id, test_admin.id)
assert exc_info.value.status_code == 400
assert "Cannot deactivate your own account" in str(exc_info.value.detail)
def test_get_all_shops(self, db, test_shop):
"""Test getting all shops with total count"""
shops, total = self.service.get_all_shops(db, skip=0, limit=10)
assert total >= 1
assert len(shops) >= 1
shop_codes = [shop.shop_code for shop in shops]
assert test_shop.shop_code in shop_codes
def test_get_all_shops_with_pagination(self, db, test_shop):
"""Test shop pagination works correctly"""
# Create additional shop for pagination test using the helper function
# from conftest import create_test_import_job # If you added the helper function
# Or create directly with proper fields
additional_shop = Shop(
shop_code=f"{test_shop.shop_code}_2",
shop_name="Test Shop 2",
owner_id=test_shop.owner_id,
is_active=True,
is_verified=False,
)
db.add(additional_shop)
db.commit()
shops_page_1 = self.service.get_all_shops(db, skip=0, limit=1)
assert len(shops_page_1[0]) == 1
shops_page_2 = self.service.get_all_shops(db, skip=1, limit=1)
assert len(shops_page_2[0]) == 1
# Ensure different shops on different pages
assert shops_page_1[0][0].id != shops_page_2[0][0].id
def test_verify_shop_mark_verified(self, db, test_shop):
"""Test marking shop as verified"""
# Ensure shop starts unverified
test_shop.is_verified = False
db.commit()
shop, message = self.service.verify_shop(db, test_shop.id)
assert shop.id == test_shop.id
assert shop.is_verified is True
assert f"{shop.shop_code} has been verified" in message
def test_verify_shop_mark_unverified(self, db, test_shop):
"""Test marking shop as unverified"""
# Ensure shop starts verified
test_shop.is_verified = True
db.commit()
shop, message = self.service.verify_shop(db, test_shop.id)
assert shop.id == test_shop.id
assert shop.is_verified is False
assert f"{shop.shop_code} has been unverified" in message
def test_verify_shop_not_found(self, db):
"""Test verify shop when shop not found"""
with pytest.raises(HTTPException) as exc_info:
self.service.verify_shop(db, 99999)
assert exc_info.value.status_code == 404
assert "Shop not found" in str(exc_info.value.detail)
def test_toggle_shop_status_deactivate(self, db, test_shop):
"""Test deactivating a shop"""
assert test_shop.is_active is True
shop, message = self.service.toggle_shop_status(db, test_shop.id)
assert shop.id == test_shop.id
assert shop.is_active is False
assert f"{shop.shop_code} has been deactivated" in message
def test_toggle_shop_status_activate(self, db, test_shop):
"""Test activating a shop"""
# First deactivate the shop
test_shop.is_active = False
db.commit()
shop, message = self.service.toggle_shop_status(db, test_shop.id)
assert shop.id == test_shop.id
assert shop.is_active is True
assert f"{shop.shop_code} has been activated" in message
def test_toggle_shop_status_not_found(self, db):
"""Test toggle shop status when shop not found"""
with pytest.raises(HTTPException) as exc_info:
self.service.toggle_shop_status(db, 99999)
assert exc_info.value.status_code == 404
assert "Shop not found" in str(exc_info.value.detail)
def test_get_marketplace_import_jobs_no_filters(self, db, test_marketplace_job):
"""Test getting marketplace import jobs without filters using fixture"""
result = self.service.get_marketplace_import_jobs(db, skip=0, limit=10)
assert len(result) >= 1
# Find our test job in the results
test_job = next(
(job for job in result if job.job_id == test_marketplace_job.id), None
)
assert test_job is not None
assert test_job.marketplace == test_marketplace_job.marketplace
assert test_job.shop_name == test_marketplace_job.shop_name
assert test_job.status == test_marketplace_job.status
def test_get_marketplace_import_jobs_with_marketplace_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by marketplace"""
# Create additional job with different marketplace
other_job = MarketplaceImportJob(
marketplace="ebay",
shop_name="eBay Shop",
status="completed",
source_url="https://ebay.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's marketplace
result = self.service.get_marketplace_import_jobs(
db, marketplace=test_marketplace_job.marketplace
)
assert len(result) >= 1
# All results should match the marketplace filter
for job in result:
assert test_marketplace_job.marketplace.lower() in job.marketplace.lower()
def test_get_marketplace_import_jobs_with_shop_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by shop name"""
# Create additional job with different shop name
other_job = MarketplaceImportJob(
marketplace="amazon",
shop_name="Different Shop Name",
status="completed",
source_url="https://different.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's shop name
result = self.service.get_marketplace_import_jobs(
db, shop_name=test_marketplace_job.shop_name
)
assert len(result) >= 1
# All results should match the shop name filter
for job in result:
assert test_marketplace_job.shop_name.lower() in job.shop_name.lower()
def test_get_marketplace_import_jobs_with_status_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by status"""
# Create additional job with different status
other_job = MarketplaceImportJob(
marketplace="amazon",
shop_name="Test Shop",
status="pending",
source_url="https://pending.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's status
result = self.service.get_marketplace_import_jobs(
db, status=test_marketplace_job.status
)
assert len(result) >= 1
# All results should match the status filter
for job in result:
assert job.status == test_marketplace_job.status
def test_get_marketplace_import_jobs_with_multiple_filters(
self, db, test_marketplace_job, test_shop, test_user
):
"""Test getting marketplace import jobs with multiple filters"""
# Create jobs that don't match all filters
non_matching_job1 = MarketplaceImportJob(
marketplace="ebay", # Different marketplace
shop_name=test_marketplace_job.shop_name,
status=test_marketplace_job.status,
source_url="https://non-matching1.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
)
non_matching_job2 = MarketplaceImportJob(
marketplace=test_marketplace_job.marketplace,
shop_name="Different Shop", # Different shop name
status=test_marketplace_job.status,
source_url="https://non-matching2.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add_all([non_matching_job1, non_matching_job2])
db.commit()
# Apply all three filters matching the test job
result = self.service.get_marketplace_import_jobs(
db,
marketplace=test_marketplace_job.marketplace,
shop_name=test_marketplace_job.shop_name,
status=test_marketplace_job.status,
)
assert len(result) >= 1
# Find our test job in the results
test_job = next(
(job for job in result if job.job_id == test_marketplace_job.id), None
)
assert test_job is not None
assert test_job.marketplace == test_marketplace_job.marketplace
assert test_job.shop_name == test_marketplace_job.shop_name
assert test_job.status == test_marketplace_job.status
def test_get_marketplace_import_jobs_null_values(self, db, test_user, test_shop):
"""Test that marketplace import jobs handle null values correctly"""
# Create job with null values but required fields
job = MarketplaceImportJob(
shop_id=test_shop.id,
user_id=test_user.id, # Fixed: Added missing user_id
marketplace="test",
shop_name="Test Shop",
status="pending",
source_url="https://test.example.com/import",
imported_count=None,
updated_count=None,
total_processed=None,
error_count=None,
error_message=None,
)
db.add(job)
db.commit()
result = self.service.get_marketplace_import_jobs(db)
assert len(result) >= 1
# Find the job with null values
null_job = next((j for j in result if j.job_id == job.id), None)
assert null_job is not None
assert null_job.imported == 0 # None converted to 0
assert null_job.updated == 0
assert null_job.total_processed == 0
assert null_job.error_count == 0
assert null_job.error_message is None
def test_get_user_by_id(self, db, test_user):
"""Test getting user by ID using fixture"""
user = self.service.get_user_by_id(db, test_user.id)
assert user is not None
assert user.id == test_user.id
assert user.email == test_user.email
assert user.username == test_user.username
def test_get_user_by_id_not_found(self, db):
"""Test getting user by ID when user doesn't exist"""
user = self.service.get_user_by_id(db, 99999)
assert user is None
def test_get_shop_by_id(self, db, test_shop):
"""Test getting shop by ID using fixture"""
shop = self.service.get_shop_by_id(db, test_shop.id)
assert shop is not None
assert shop.id == test_shop.id
assert shop.shop_code == test_shop.shop_code
assert shop.shop_name == test_shop.shop_name
def test_get_shop_by_id_not_found(self, db):
"""Test getting shop by ID when shop doesn't exist"""
shop = self.service.get_shop_by_id(db, 99999)
assert shop is None
def test_user_exists_true(self, db, test_user):
"""Test user_exists returns True when user exists"""
exists = self.service.user_exists(db, test_user.id)
assert exists is True
def test_user_exists_false(self, db):
"""Test user_exists returns False when user doesn't exist"""
exists = self.service.user_exists(db, 99999)
assert exists is False
def test_shop_exists_true(self, db, test_shop):
"""Test shop_exists returns True when shop exists"""
exists = self.service.shop_exists(db, test_shop.id)
assert exists is True
def test_shop_exists_false(self, db):
"""Test shop_exists returns False when shop doesn't exist"""
exists = self.service.shop_exists(db, 99999)
assert exists is False

View File

@@ -1,201 +0,0 @@
# tests/test_background_tasks.py
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
from app.tasks.background_tasks import process_marketplace_import
from models.database_models import MarketplaceImportJob
class TestBackgroundTasks:
@pytest.mark.asyncio
async def test_marketplace_import_success(self, db, test_user, test_shop):
"""Test successful marketplace import background task"""
# Create import job
job = MarketplaceImportJob(
status="pending",
source_url="http://example.com/test.csv",
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id,
)
db.add(job)
db.commit()
db.refresh(job)
# Store the job ID before it becomes detached
job_id = job.id
# Mock CSV processor and prevent session from closing
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0,
}
)
# Run background task
await process_marketplace_import(
job_id, "http://example.com/test.csv", "TestMarket", "TESTSHOP", 1000
)
# Re-query the job using the stored ID
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "completed"
assert updated_job.imported_count == 10
assert updated_job.updated_count == 5
assert updated_job.total_processed == 15
assert updated_job.error_count == 0
assert updated_job.started_at is not None
assert updated_job.completed_at is not None
@pytest.mark.asyncio
async def test_marketplace_import_failure(self, db, test_user, test_shop):
"""Test marketplace import failure handling"""
# Create import job
job = MarketplaceImportJob(
status="pending",
source_url="http://example.com/test.csv",
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id,
)
db.add(job)
db.commit()
db.refresh(job)
# Store the job ID before it becomes detached
job_id = job.id
# Mock CSV processor to raise exception
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
side_effect=Exception("Import failed")
)
# Run background task - this should not raise the exception
# because it's handled in the background task
try:
await process_marketplace_import(
job_id,
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000,
)
except Exception:
# The background task should handle exceptions internally
# If an exception propagates here, that's a bug in the background task
pass
# Re-query the job using the stored ID
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "failed"
assert "Import failed" in updated_job.error_message
@pytest.mark.asyncio
async def test_marketplace_import_job_not_found(self, db):
"""Test handling when import job doesn't exist"""
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0,
}
)
# Run background task with non-existent job ID
await process_marketplace_import(
999, # Non-existent job ID
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000,
)
# Should not raise an exception, just log and return
# The CSV processor should not be called
mock_instance.process_marketplace_csv_from_url.assert_not_called()
@pytest.mark.asyncio
async def test_marketplace_import_with_errors(self, db, test_user, test_shop):
"""Test marketplace import with some errors"""
# Create import job
job = MarketplaceImportJob(
status="pending",
source_url="http://example.com/test.csv",
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id,
)
db.add(job)
db.commit()
db.refresh(job)
# Store the job ID before it becomes detached
job_id = job.id
# Mock CSV processor with some errors
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 8,
"updated": 5,
"total_processed": 15,
"errors": 2,
}
)
# Run background task
await process_marketplace_import(
job_id, "http://example.com/test.csv", "TestMarket", "TESTSHOP", 1000
)
# Re-query the job using the stored ID
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "completed_with_errors"
assert updated_job.imported_count == 8
assert updated_job.updated_count == 5
assert updated_job.error_count == 2
assert updated_job.total_processed == 15
assert "2 rows had errors" in updated_job.error_message

View File

@@ -0,0 +1,4 @@
product_id,title,price,currency,brand,marketplace
TEST001,Sample Product 1,19.99,EUR,TestBrand,TestMarket
TEST002,Sample Product 2,29.99,EUR,TestBrand,TestMarket
TEST003,Sample Product 3,39.99,USD,AnotherBrand,TestMarket
1 product_id title price currency brand marketplace
2 TEST001 Sample Product 1 19.99 EUR TestBrand TestMarket
3 TEST002 Sample Product 2 29.99 EUR TestBrand TestMarket
4 TEST003 Sample Product 3 39.99 USD AnotherBrand TestMarket

View File

@@ -1,48 +0,0 @@
# tests/test_error_handling.py
import pytest
class TestErrorHandling:
def test_invalid_json(self, client, auth_headers):
"""Test handling of invalid JSON"""
response = client.post(
"/api/v1/product", headers=auth_headers, content="invalid json"
)
assert response.status_code == 422 # Validation error
def test_missing_required_fields(self, client, auth_headers):
"""Test handling of missing required fields"""
response = client.post(
"/api/v1/product", headers=auth_headers, json={"title": "Test"}
) # Missing product_id
assert response.status_code == 422
def test_invalid_authentication(self, client):
"""Test handling of invalid authentication"""
response = client.get(
"/api/v1/product", headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401 # Token is not valid
def test_nonexistent_resource(self, client, auth_headers):
"""Test handling of nonexistent resource access"""
response = client.get("/api/v1/product/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
response = client.get("/api/v1/shop/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
def test_duplicate_resource_creation(self, client, auth_headers, test_product):
"""Test handling of duplicate resource creation"""
product_data = {
"product_id": test_product.product_id, # Duplicate ID
"title": "Another Product",
}
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 400

View File

@@ -1,70 +0,0 @@
# tests/test_export.py
import csv
from io import StringIO
import pytest
from models.database_models import Product
class TestExportFunctionality:
def test_csv_export_basic(self, client, auth_headers, test_product):
"""Test basic CSV export functionality"""
response = client.get("/api/v1/export-csv", headers=auth_headers)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv; charset=utf-8"
# Parse CSV content
csv_content = response.content.decode("utf-8")
csv_reader = csv.reader(StringIO(csv_content))
# Check header row
header = next(csv_reader)
expected_fields = ["product_id", "title", "description", "price", "marketplace"]
for field in expected_fields:
assert field in header
def test_csv_export_with_marketplace_filter(self, client, auth_headers, db):
"""Test CSV export with marketplace filtering"""
# Create products in different marketplaces
products = [
Product(product_id="EXP1", title="Product 1", marketplace="Amazon"),
Product(product_id="EXP2", title="Product 2", marketplace="eBay"),
]
db.add_all(products)
db.commit()
response = client.get(
"/api/v1/export-csv?marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
csv_content = response.content.decode("utf-8")
assert "EXP1" in csv_content
assert "EXP2" not in csv_content # Should be filtered out
def test_csv_export_performance(self, client, auth_headers, db):
"""Test CSV export performance with many products"""
# Create many products
products = []
for i in range(1000):
product = Product(
product_id=f"PERF{i:04d}",
title=f"Performance Product {i}",
marketplace="Performance",
)
products.append(product)
db.add_all(products)
db.commit()
import time
start_time = time.time()
response = client.get("/api/v1/export-csv", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert end_time - start_time < 10.0 # Should complete within 10 seconds

View File

@@ -1,113 +0,0 @@
# tests/test_filtering.py
import pytest
from models.database_models import Product
class TestFiltering:
def test_product_brand_filter(self, client, auth_headers, db):
"""Test filtering products by brand"""
# Create products with different brands
products = [
Product(product_id="BRAND1", title="Product 1", brand="BrandA"),
Product(product_id="BRAND2", title="Product 2", brand="BrandB"),
Product(product_id="BRAND3", title="Product 3", brand="BrandA"),
]
db.add_all(products)
db.commit()
# Filter by BrandA
response = client.get("/api/v1/product?brand=BrandA", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
# Filter by BrandB
response = client.get("/api/v1/product?brand=BrandB", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
def test_product_marketplace_filter(self, client, auth_headers, db):
"""Test filtering products by marketplace"""
products = [
Product(product_id="MKT1", title="Product 1", marketplace="Amazon"),
Product(product_id="MKT2", title="Product 2", marketplace="eBay"),
Product(product_id="MKT3", title="Product 3", marketplace="Amazon"),
]
db.add_all(products)
db.commit()
response = client.get(
"/api/v1/product?marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
def test_product_search_filter(self, client, auth_headers, db):
"""Test searching products by text"""
products = [
Product(
product_id="SEARCH1", title="Apple iPhone", description="Smartphone"
),
Product(
product_id="SEARCH2",
title="Samsung Galaxy",
description="Android phone",
),
Product(
product_id="SEARCH3", title="iPad Tablet", description="Apple tablet"
),
]
db.add_all(products)
db.commit()
# Search for "Apple"
response = client.get("/api/v1/product?search=Apple", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2 # iPhone and iPad
# Search for "phone"
response = client.get("/api/v1/product?search=phone", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2 # iPhone and Galaxy
def test_combined_filters(self, client, auth_headers, db):
"""Test combining multiple filters"""
products = [
Product(
product_id="COMBO1",
title="Apple iPhone",
brand="Apple",
marketplace="Amazon",
),
Product(
product_id="COMBO2",
title="Apple iPad",
brand="Apple",
marketplace="eBay",
),
Product(
product_id="COMBO3",
title="Samsung Phone",
brand="Samsung",
marketplace="Amazon",
),
]
db.add_all(products)
db.commit()
# Filter by brand AND marketplace
response = client.get(
"/api/v1/product?brand=Apple&marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1 # Only iPhone matches both

View File

@@ -1,128 +0,0 @@
# tests/test_integration.py
import pytest
class TestIntegrationFlows:
def test_full_product_workflow(self, client, auth_headers):
"""Test complete product creation and management workflow"""
# 1. Create a product
product_data = {
"product_id": "FLOW001",
"title": "Integration Test Product",
"description": "Testing full workflow",
"price": "29.99",
"brand": "FlowBrand",
"gtin": "1111222233334",
"availability": "in stock",
"marketplace": "TestFlow",
}
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 200
product = response.json()
# 2. Add stock for the product
stock_data = {
"gtin": product["gtin"],
"location": "MAIN_WAREHOUSE",
"quantity": 50,
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
assert response.status_code == 200
# 3. Get product with stock info
response = client.get(
f"/api/v1/product/{product['product_id']}", headers=auth_headers
)
assert response.status_code == 200
product_detail = response.json()
assert product_detail["stock_info"]["total_quantity"] == 50
# 4. Update product
update_data = {"title": "Updated Integration Test Product"}
response = client.put(
f"/api/v1/product/{product['product_id']}",
headers=auth_headers,
json=update_data,
)
assert response.status_code == 200
# 5. Search for product
response = client.get(
"/api/v1/product?search=Updated Integration", headers=auth_headers
)
assert response.status_code == 200
assert response.json()["total"] == 1
def test_shop_product_workflow(self, client, auth_headers):
"""Test shop creation and product management workflow"""
# 1. Create a shop
shop_data = {
"shop_code": "FLOWSHOP",
"shop_name": "Integration Flow Shop",
"description": "Test shop for integration",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
assert response.status_code == 200
shop = response.json()
# 2. Create a product
product_data = {
"product_id": "SHOPFLOW001",
"title": "Shop Flow Product",
"price": "15.99",
"marketplace": "ShopFlow",
}
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 200
product = response.json()
# 3. Add product to shop (if endpoint exists)
# This would test the shop-product association
# 4. Get shop details
response = client.get(f"/api/v1/shop/{shop['shop_code']}", headers=auth_headers)
assert response.status_code == 200
def test_stock_operations_workflow(self, client, auth_headers):
"""Test complete stock management workflow"""
gtin = "9999888877776"
location = "TEST_WAREHOUSE"
# 1. Set initial stock
response = client.post(
"/api/v1/stock",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 100},
)
assert response.status_code == 200
# 2. Add more stock
response = client.post(
"/api/v1/stock/add",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 25},
)
assert response.status_code == 200
assert response.json()["quantity"] == 125
# 3. Remove some stock
response = client.post(
"/api/v1/stock/remove",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 30},
)
assert response.status_code == 200
assert response.json()["quantity"] == 95
# 4. Check total stock
response = client.get(f"/api/v1/stock/{gtin}/total", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total_quantity"] == 95

View File

@@ -1,71 +0,0 @@
# tests/test_middleware.py
from unittest.mock import Mock, patch
import pytest
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
class TestRateLimiter:
def test_rate_limiter_allows_requests(self):
"""Test rate limiter allows requests within limit"""
limiter = RateLimiter()
client_id = "test_client"
# Should allow first request
assert (
limiter.allow_request(client_id, max_requests=10, window_seconds=3600)
is True
)
# Should allow subsequent requests within limit
for _ in range(5):
assert (
limiter.allow_request(client_id, max_requests=10, window_seconds=3600)
is True
)
def test_rate_limiter_blocks_excess_requests(self):
"""Test rate limiter blocks requests exceeding limit"""
limiter = RateLimiter()
client_id = "test_client_blocked"
max_requests = 3
# Use up the allowed requests
for _ in range(max_requests):
assert limiter.allow_request(client_id, max_requests, 3600) is True
# Next request should be blocked
assert limiter.allow_request(client_id, max_requests, 3600) is False
class TestAuthManager:
def test_password_hashing_and_verification(self):
"""Test password hashing and verification"""
auth_manager = AuthManager()
password = "test_password_123"
# Hash password
hashed = auth_manager.hash_password(password)
# Verify correct password
assert auth_manager.verify_password(password, hashed) is True
# Verify incorrect password
assert auth_manager.verify_password("wrong_password", hashed) is False
def test_jwt_token_creation_and_validation(self, test_user):
"""Test JWT token creation and validation"""
auth_manager = AuthManager()
# Create token
token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
assert isinstance(token_data["expires_in"], int)
# Token should be a string
assert isinstance(token_data["access_token"], str)
assert len(token_data["access_token"]) > 50 # JWT tokens are long

View File

@@ -1,59 +0,0 @@
# tests/test_performance.py
import time
import pytest
from models.database_models import Product
class TestPerformance:
def test_product_list_performance(self, client, auth_headers, db):
"""Test performance of product listing with many products"""
# Create multiple products
products = []
for i in range(100):
product = Product(
product_id=f"PERF{i:03d}",
title=f"Performance Test Product {i}",
price=f"{i}.99",
marketplace="Performance",
)
products.append(product)
db.add_all(products)
db.commit()
# Time the request
start_time = time.time()
response = client.get("/api/v1/product?limit=100", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert len(response.json()["products"]) == 100
assert end_time - start_time < 2.0 # Should complete within 2 seconds
def test_search_performance(self, client, auth_headers, db):
"""Test search performance"""
# Create products with searchable content
products = []
for i in range(50):
product = Product(
product_id=f"SEARCH{i:03d}",
title=f"Searchable Product {i}",
description=f"This is a searchable product number {i}",
brand="SearchBrand",
marketplace="SearchMarket",
)
products.append(product)
db.add_all(products)
db.commit()
# Time search request
start_time = time.time()
response = client.get("/api/v1/product?search=Searchable", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert response.json()["total"] == 50
assert end_time - start_time < 1.0 # Search should be fast

3
tests/unit/__init__.py Normal file
View File

@@ -0,0 +1,3 @@
# tests/unit/__init__.py
"""Unit tests - fast, isolated component tests."""

6
tests/unit/conftest.py Normal file
View File

@@ -0,0 +1,6 @@
# tests/unit/conftest.py
"""Unit test specific fixtures."""
import pytest
# Add any unit-specific fixtures here if needed

View File

@@ -0,0 +1,3 @@
# tests/unit/models/__init__.py
"""Database and API model unit tests."""

View File

@@ -1,10 +1,11 @@
# tests/test_database.py
# tests/unit/models/test_database_models.py
import pytest
from sqlalchemy import text
from models.database_models import Product, Shop, Stock, User
@pytest.mark.unit
@pytest.mark.database
class TestDatabaseModels:
def test_user_model(self, db):
"""Test User model creation and relationships"""

View File

@@ -0,0 +1,3 @@
# tests/unit/services/__init__.py
"""Service layer unit tests."""

View File

@@ -0,0 +1,116 @@
# tests/unit/services/test_admin_service.py
import pytest
from fastapi import HTTPException
from app.services.admin_service import AdminService
from models.database_models import MarketplaceImportJob, Shop
@pytest.mark.unit
@pytest.mark.admin
class TestAdminService:
"""Test suite for AdminService following the application's testing patterns"""
def setup_method(self):
"""Setup method following the same pattern as product service tests"""
self.service = AdminService()
def test_get_all_users(self, db, test_user, test_admin):
"""Test getting all users with pagination"""
users = self.service.get_all_users(db, skip=0, limit=10)
assert len(users) >= 2 # test_user + test_admin
user_ids = [user.id for user in users]
assert test_user.id in user_ids
assert test_admin.id in user_ids
def test_get_all_users_with_pagination(self, db, test_user, test_admin):
"""Test user pagination works correctly"""
users = self.service.get_all_users(db, skip=0, limit=1)
assert len(users) == 1
users_second_page = self.service.get_all_users(db, skip=1, limit=1)
assert len(users_second_page) == 1
assert users[0].id != users_second_page[0].id
def test_toggle_user_status_deactivate(self, db, test_user, test_admin):
"""Test deactivating a user"""
assert test_user.is_active is True
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is False
assert f"{user.username} has been deactivated" in message
def test_toggle_user_status_activate(self, db, test_user, test_admin):
"""Test activating a user"""
# First deactivate the user
test_user.is_active = False
db.commit()
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is True
assert f"{user.username} has been activated" in message
def test_toggle_user_status_user_not_found(self, db, test_admin):
"""Test toggle user status when user not found"""
with pytest.raises(HTTPException) as exc_info:
self.service.toggle_user_status(db, 99999, test_admin.id)
assert exc_info.value.status_code == 404
assert "User not found" in str(exc_info.value.detail)
def test_toggle_user_status_cannot_deactivate_self(self, db, test_admin):
"""Test that admin cannot deactivate their own account"""
with pytest.raises(HTTPException) as exc_info:
self.service.toggle_user_status(db, test_admin.id, test_admin.id)
assert exc_info.value.status_code == 400
assert "Cannot deactivate your own account" in str(exc_info.value.detail)
def test_get_all_shops(self, db, test_shop):
"""Test getting all shops with total count"""
shops, total = self.service.get_all_shops(db, skip=0, limit=10)
assert total >= 1
assert len(shops) >= 1
shop_codes = [shop.shop_code for shop in shops]
assert test_shop.shop_code in shop_codes
def test_verify_shop_mark_verified(self, db, test_shop):
"""Test marking shop as verified"""
# Ensure shop starts unverified
test_shop.is_verified = False
db.commit()
shop, message = self.service.verify_shop(db, test_shop.id)
assert shop.id == test_shop.id
assert shop.is_verified is True
assert f"{shop.shop_code} has been verified" in message
def test_verify_shop_not_found(self, db):
"""Test verify shop when shop not found"""
with pytest.raises(HTTPException) as exc_info:
self.service.verify_shop(db, 99999)
assert exc_info.value.status_code == 404
assert "Shop not found" in str(exc_info.value.detail)
def test_get_marketplace_import_jobs_no_filters(self, db, test_marketplace_job):
"""Test getting marketplace import jobs without filters using fixture"""
result = self.service.get_marketplace_import_jobs(db, skip=0, limit=10)
assert len(result) >= 1
# Find our test job in the results
test_job = next(
(job for job in result if job.job_id == test_marketplace_job.id), None
)
assert test_job is not None
assert test_job.marketplace == test_marketplace_job.marketplace
assert test_job.shop_name == test_marketplace_job.shop_name
assert test_job.status == test_marketplace_job.status

View File

@@ -6,7 +6,8 @@ from app.services.auth_service import AuthService
from models.api_models import UserLogin, UserRegister
from models.database_models import User
@pytest.mark.unit
@pytest.mark.auth
class TestAuthService:
"""Test suite for AuthService following the application's testing patterns"""

View File

@@ -8,7 +8,8 @@ from app.services.marketplace_service import MarketplaceService
from models.api_models import MarketplaceImportRequest
from models.database_models import MarketplaceImportJob, Shop, User
@pytest.mark.unit
@pytest.mark.marketplace
class TestMarketplaceService:
def setup_method(self):
self.service = MarketplaceService()

View File

@@ -5,7 +5,8 @@ from app.services.product_service import ProductService
from models.api_models import ProductCreate
from models.database_models import Product
@pytest.mark.unit
@pytest.mark.products
class TestProductService:
def setup_method(self):
self.service = ProductService()

View File

@@ -5,7 +5,8 @@ from fastapi import HTTPException
from app.services.shop_service import ShopService
from models.api_models import ShopCreate, ShopProductCreate
@pytest.mark.unit
@pytest.mark.shops
class TestShopService:
"""Test suite for ShopService following the application's testing patterns"""

View File

@@ -4,7 +4,8 @@ import pytest
from app.services.stats_service import StatsService
from models.database_models import Product, Stock
@pytest.mark.unit
@pytest.mark.stats
class TestStatsService:
"""Test suite for StatsService following the application's testing patterns"""

View File

@@ -7,7 +7,8 @@ from app.services.stock_service import StockService
from models.api_models import StockAdd, StockCreate, StockUpdate
from models.database_models import Product, Stock
@pytest.mark.unit
@pytest.mark.stock
class TestStockService:
def setup_method(self):
self.service = StockService()

View File

@@ -0,0 +1,3 @@
# tests/unit/utils/__init__.py
"""Utility function unit tests."""

View File

@@ -8,7 +8,7 @@ import requests.exceptions
from utils.csv_processor import CSVProcessor
@pytest.mark.unit
class TestCSVProcessor:
def setup_method(self):
self.processor = CSVProcessor()

View File

@@ -1,9 +1,10 @@
# tests/test_utils.py (Enhanced version of your existing file)
# tests/unit/utils/test_data_processing.py
import pytest
from utils.data_processing import GTINProcessor, PriceProcessor
@pytest.mark.unit
class TestGTINProcessor:
def setup_method(self):
self.processor = GTINProcessor()
@@ -60,6 +61,7 @@ class TestGTINProcessor:
assert self.processor.validate("123456789012345") is False # 15 digits
@pytest.mark.unit
class TestPriceProcessor:
def setup_method(self):
self.processor = PriceProcessor()

View File

@@ -3,7 +3,7 @@ import pytest
from utils.data_processing import GTINProcessor, PriceProcessor
@pytest.mark.unit
class TestDataValidation:
def test_gtin_normalization_edge_cases(self):
"""Test GTIN normalization with edge cases"""