code quality run

This commit is contained in:
2025-09-13 21:58:54 +02:00
parent 0dfd885847
commit 3eb18ef91e
63 changed files with 1802 additions and 1289 deletions

View File

@@ -1,16 +1,18 @@
# tests/conftest.py
import uuid
import pytest
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from app.core.database import Base, get_db
from main import app
from app.core.database import get_db, Base
# Import all models to ensure they're registered with Base metadata
from models.database_models import User, Product, Stock, Shop, MarketplaceImportJob, ShopProduct
from middleware.auth import AuthManager
import uuid
# Import all models to ensure they're registered with Base metadata
from models.database_models import (MarketplaceImportJob, Product, Shop,
ShopProduct, Stock, User)
# Use in-memory SQLite database for tests
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@@ -23,7 +25,7 @@ def engine():
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
echo=False # Set to True for SQL debugging
echo=False, # Set to True for SQL debugging
)
@@ -89,7 +91,7 @@ def test_user(db, auth_manager):
username=f"testuser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True
is_active=True,
)
db.add(user)
db.commit()
@@ -107,7 +109,7 @@ def test_admin(db, auth_manager):
username=f"admin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True
is_active=True,
)
db.add(admin)
db.commit()
@@ -118,10 +120,10 @@ def test_admin(db, auth_manager):
@pytest.fixture
def auth_headers(client, test_user):
"""Get authentication headers for test user"""
response = client.post("/api/v1/auth/login", json={
"username": test_user.username,
"password": "testpass123"
})
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200, f"Login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@@ -130,10 +132,10 @@ def auth_headers(client, test_user):
@pytest.fixture
def admin_headers(client, test_admin):
"""Get authentication headers for admin user"""
response = client.post("/api/v1/auth/login", json={
"username": test_admin.username,
"password": "adminpass123"
})
response = client.post(
"/api/v1/auth/login",
json={"username": test_admin.username, "password": "adminpass123"},
)
assert response.status_code == 200, f"Admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@@ -152,7 +154,7 @@ def test_product(db):
gtin="1234567890123",
availability="in stock",
marketplace="Letzshop",
shop_name="TestShop"
shop_name="TestShop",
)
db.add(product)
db.commit()
@@ -169,7 +171,7 @@ def test_shop(db, test_user):
shop_name=f"Test Shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True
is_verified=True,
)
db.add(shop)
db.commit()
@@ -186,7 +188,7 @@ def test_stock(db, test_product, test_shop):
location=f"WAREHOUSE_A_{unique_id}",
quantity=10,
reserved_quantity=0,
shop_id=test_shop.id # Add shop_id reference
shop_id=test_shop.id, # Add shop_id reference
)
db.add(stock)
db.commit()
@@ -208,7 +210,7 @@ def test_marketplace_job(db, test_shop, test_user): # Add test_shop dependency
updated_count=3,
total_processed=8,
error_count=0,
error_message=None
error_message=None,
)
db.add(job)
db.commit()
@@ -219,16 +221,16 @@ def test_marketplace_job(db, test_shop, test_user): # Add test_shop dependency
def create_test_import_job(db, shop_id, **kwargs): # Add shop_id parameter
"""Helper function to create MarketplaceImportJob with defaults"""
defaults = {
'marketplace': 'test',
'shop_name': 'Test Shop',
'status': 'pending',
'source_url': 'https://test.example.com/import',
'shop_id': shop_id, # Add required shop_id
'imported_count': 0,
'updated_count': 0,
'total_processed': 0,
'error_count': 0,
'error_message': None
"marketplace": "test",
"shop_name": "Test Shop",
"status": "pending",
"source_url": "https://test.example.com/import",
"shop_id": shop_id, # Add required shop_id
"imported_count": 0,
"updated_count": 0,
"total_processed": 0,
"error_count": 0,
"error_message": None,
}
defaults.update(kwargs)
@@ -250,6 +252,7 @@ def cleanup():
# Add these fixtures to your existing conftest.py
@pytest.fixture
def unique_product(db):
"""Create a unique product for tests that need isolated product data"""
@@ -265,7 +268,7 @@ def unique_product(db):
availability="in stock",
marketplace="Letzshop",
shop_name=f"UniqueShop_{unique_id}",
google_product_category=f"UniqueCategory_{unique_id}"
google_product_category=f"UniqueCategory_{unique_id}",
)
db.add(product)
db.commit()
@@ -283,7 +286,7 @@ def unique_shop(db, test_user):
description=f"A unique test shop {unique_id}",
owner_id=test_user.id,
is_active=True,
is_verified=True
is_verified=True,
)
db.add(shop)
db.commit()
@@ -301,7 +304,7 @@ def other_user(db, auth_manager):
username=f"otheruser_{unique_id}",
hashed_password=hashed_password,
role="user",
is_active=True
is_active=True,
)
db.add(user)
db.commit()
@@ -318,7 +321,7 @@ def inactive_shop(db, other_user):
shop_name=f"Inactive Shop {unique_id}",
owner_id=other_user.id,
is_active=False,
is_verified=False
is_verified=False,
)
db.add(shop)
db.commit()
@@ -335,7 +338,7 @@ def verified_shop(db, other_user):
shop_name=f"Verified Shop {unique_id}",
owner_id=other_user.id,
is_active=True,
is_verified=True
is_verified=True,
)
db.add(shop)
db.commit()
@@ -347,16 +350,14 @@ def verified_shop(db, other_user):
def shop_product(db, test_shop, unique_product):
"""Create a shop product relationship"""
shop_product = ShopProduct(
shop_id=test_shop.id,
product_id=unique_product.id,
is_active=True
shop_id=test_shop.id, product_id=unique_product.id, is_active=True
)
# Add optional fields if they exist in your model
if hasattr(ShopProduct, 'price'):
if hasattr(ShopProduct, "price"):
shop_product.price = "24.99"
if hasattr(ShopProduct, 'is_featured'):
if hasattr(ShopProduct, "is_featured"):
shop_product.is_featured = False
if hasattr(ShopProduct, 'stock_quantity'):
if hasattr(ShopProduct, "stock_quantity"):
shop_product.stock_quantity = 10
db.add(shop_product)
@@ -382,7 +383,7 @@ def multiple_products(db):
marketplace=f"MultiMarket_{i % 2}", # Create 2 different marketplaces
shop_name=f"MultiShop_{i}",
google_product_category=f"MultiCategory_{i % 2}", # Create 2 different categories
gtin=f"1234567890{i}{unique_id[:2]}"
gtin=f"1234567890{i}{unique_id[:2]}",
)
products.append(product)
@@ -404,7 +405,7 @@ def multiple_stocks(db, multiple_products, test_shop):
location=f"LOC_{i}",
quantity=10 + (i * 5), # Different quantities
reserved_quantity=i,
shop_id=test_shop.id
shop_id=test_shop.id,
)
stocks.append(stock)
@@ -422,12 +423,12 @@ def create_unique_product_factory():
def _create_product(db, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
'product_id': f"FACTORY_{unique_id}",
'title': f"Factory Product {unique_id}",
'price': "15.99",
'currency': "EUR",
'marketplace': "TestMarket",
'shop_name': "TestShop"
"product_id": f"FACTORY_{unique_id}",
"title": f"Factory Product {unique_id}",
"price": "15.99",
"currency": "EUR",
"marketplace": "TestMarket",
"shop_name": "TestShop",
}
defaults.update(kwargs)
@@ -452,11 +453,11 @@ def create_unique_shop_factory():
def _create_shop(db, owner_id, **kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
'shop_code': f"FACTORY_{unique_id}",
'shop_name': f"Factory Shop {unique_id}",
'owner_id': owner_id,
'is_active': True,
'is_verified': False
"shop_code": f"FACTORY_{unique_id}",
"shop_name": f"Factory Shop {unique_id}",
"owner_id": owner_id,
"is_active": True,
"is_verified": False,
}
defaults.update(kwargs)
@@ -473,4 +474,3 @@ def create_unique_shop_factory():
def shop_factory():
"""Fixture that provides a shop factory function"""
return create_unique_shop_factory()

View File

@@ -20,11 +20,16 @@ class TestAdminAPI:
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert response.status_code == 403
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_toggle_user_status_admin(self, client, admin_headers, test_user):
"""Test admin toggling user status"""
response = client.put(f"/api/v1/admin/users/{test_user.id}/status", headers=admin_headers)
response = client.put(
f"/api/v1/admin/users/{test_user.id}/status", headers=admin_headers
)
assert response.status_code == 200
message = response.json()["message"]
@@ -39,9 +44,13 @@ class TestAdminAPI:
assert response.status_code == 404
assert "User not found" in response.json()["detail"]
def test_toggle_user_status_cannot_deactivate_self(self, client, admin_headers, test_admin):
def test_toggle_user_status_cannot_deactivate_self(
self, client, admin_headers, test_admin
):
"""Test that admin cannot deactivate their own account"""
response = client.put(f"/api/v1/admin/users/{test_admin.id}/status", headers=admin_headers)
response = client.put(
f"/api/v1/admin/users/{test_admin.id}/status", headers=admin_headers
)
assert response.status_code == 400
assert "Cannot deactivate your own account" in response.json()["detail"]
@@ -56,7 +65,9 @@ class TestAdminAPI:
assert len(data["shops"]) >= 1
# Check that test_shop is in the response
shop_codes = [shop["shop_code"] for shop in data["shops"] if "shop_code" in shop]
shop_codes = [
shop["shop_code"] for shop in data["shops"] if "shop_code" in shop
]
assert test_shop.shop_code in shop_codes
def test_get_all_shops_non_admin(self, client, auth_headers):
@@ -64,11 +75,16 @@ class TestAdminAPI:
response = client.get("/api/v1/admin/shops", headers=auth_headers)
assert response.status_code == 403
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_verify_shop_admin(self, client, admin_headers, test_shop):
"""Test admin verifying/unverifying shop"""
response = client.put(f"/api/v1/admin/shops/{test_shop.id}/verify", headers=admin_headers)
response = client.put(
f"/api/v1/admin/shops/{test_shop.id}/verify", headers=admin_headers
)
assert response.status_code == 200
message = response.json()["message"]
@@ -84,7 +100,9 @@ class TestAdminAPI:
def test_toggle_shop_status_admin(self, client, admin_headers, test_shop):
"""Test admin toggling shop status"""
response = client.put(f"/api/v1/admin/shops/{test_shop.id}/status", headers=admin_headers)
response = client.put(
f"/api/v1/admin/shops/{test_shop.id}/status", headers=admin_headers
)
assert response.status_code == 200
message = response.json()["message"]
@@ -98,9 +116,13 @@ class TestAdminAPI:
assert response.status_code == 404
assert "Shop not found" in response.json()["detail"]
def test_get_marketplace_import_jobs_admin(self, client, admin_headers, test_marketplace_job):
def test_get_marketplace_import_jobs_admin(
self, client, admin_headers, test_marketplace_job
):
"""Test admin getting marketplace import jobs"""
response = client.get("/api/v1/admin/marketplace-import-jobs", headers=admin_headers)
response = client.get(
"/api/v1/admin/marketplace-import-jobs", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
@@ -110,43 +132,58 @@ class TestAdminAPI:
job_ids = [job["job_id"] for job in data if "job_id" in job]
assert test_marketplace_job.id in job_ids
def test_get_marketplace_import_jobs_with_filters(self, client, admin_headers, test_marketplace_job):
def test_get_marketplace_import_jobs_with_filters(
self, client, admin_headers, test_marketplace_job
):
"""Test admin getting marketplace import jobs with filters"""
response = client.get(
"/api/v1/admin/marketplace-import-jobs",
params={"marketplace": test_marketplace_job.marketplace},
headers=admin_headers
headers=admin_headers,
)
assert response.status_code == 200
data = response.json()
assert len(data) >= 1
assert all(job["marketplace"] == test_marketplace_job.marketplace for job in data)
assert all(
job["marketplace"] == test_marketplace_job.marketplace for job in data
)
def test_get_marketplace_import_jobs_non_admin(self, client, auth_headers):
"""Test non-admin trying to access marketplace import jobs"""
response = client.get("/api/v1/admin/marketplace-import-jobs", headers=auth_headers)
response = client.get(
"/api/v1/admin/marketplace-import-jobs", headers=auth_headers
)
assert response.status_code == 403
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
assert (
"Access denied" in response.json()["detail"]
or "admin" in response.json()["detail"].lower()
)
def test_admin_pagination_users(self, client, admin_headers, test_user, test_admin):
"""Test user pagination works correctly"""
# Test first page
response = client.get("/api/v1/admin/users?skip=0&limit=1", headers=admin_headers)
response = client.get(
"/api/v1/admin/users?skip=0&limit=1", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert len(data) == 1
# Test second page
response = client.get("/api/v1/admin/users?skip=1&limit=1", headers=admin_headers)
response = client.get(
"/api/v1/admin/users?skip=1&limit=1", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert len(data) >= 0 # Could be 1 or 0 depending on total users
def test_admin_pagination_shops(self, client, admin_headers, test_shop):
"""Test shop pagination works correctly"""
response = client.get("/api/v1/admin/shops?skip=0&limit=1", headers=admin_headers)
response = client.get(
"/api/v1/admin/shops?skip=0&limit=1", headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1

View File

@@ -1,10 +1,11 @@
# tests/test_admin_service.py
import pytest
from datetime import datetime
import pytest
from fastapi import HTTPException
from app.services.admin_service import AdminService
from models.database_models import User, Shop, MarketplaceImportJob
from models.database_models import MarketplaceImportJob, Shop, User
class TestAdminService:
@@ -91,7 +92,7 @@ class TestAdminService:
shop_name="Test Shop 2",
owner_id=test_shop.owner_id,
is_active=True,
is_verified=False
is_verified=False,
)
db.add(additional_shop)
db.commit()
@@ -173,13 +174,17 @@ class TestAdminService:
assert len(result) >= 1
# Find our test job in the results
test_job = next((job for job in result if job.job_id == test_marketplace_job.id), None)
test_job = next(
(job for job in result if job.job_id == test_marketplace_job.id), None
)
assert test_job is not None
assert test_job.marketplace == test_marketplace_job.marketplace
assert test_job.shop_name == test_marketplace_job.shop_name
assert test_job.status == test_marketplace_job.status
def test_get_marketplace_import_jobs_with_marketplace_filter(self, db, test_marketplace_job, test_user, test_shop):
def test_get_marketplace_import_jobs_with_marketplace_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by marketplace"""
# Create additional job with different marketplace
other_job = MarketplaceImportJob(
@@ -188,20 +193,24 @@ class TestAdminService:
status="completed",
source_url="https://ebay.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id # Fixed: Added missing user_id
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's marketplace
result = self.service.get_marketplace_import_jobs(db, marketplace=test_marketplace_job.marketplace)
result = self.service.get_marketplace_import_jobs(
db, marketplace=test_marketplace_job.marketplace
)
assert len(result) >= 1
# All results should match the marketplace filter
for job in result:
assert test_marketplace_job.marketplace.lower() in job.marketplace.lower()
def test_get_marketplace_import_jobs_with_shop_filter(self, db, test_marketplace_job, test_user, test_shop):
def test_get_marketplace_import_jobs_with_shop_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by shop name"""
# Create additional job with different shop name
other_job = MarketplaceImportJob(
@@ -210,20 +219,24 @@ class TestAdminService:
status="completed",
source_url="https://different.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id # Fixed: Added missing user_id
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's shop name
result = self.service.get_marketplace_import_jobs(db, shop_name=test_marketplace_job.shop_name)
result = self.service.get_marketplace_import_jobs(
db, shop_name=test_marketplace_job.shop_name
)
assert len(result) >= 1
# All results should match the shop name filter
for job in result:
assert test_marketplace_job.shop_name.lower() in job.shop_name.lower()
def test_get_marketplace_import_jobs_with_status_filter(self, db, test_marketplace_job, test_user, test_shop):
def test_get_marketplace_import_jobs_with_status_filter(
self, db, test_marketplace_job, test_user, test_shop
):
"""Test getting marketplace import jobs filtered by status"""
# Create additional job with different status
other_job = MarketplaceImportJob(
@@ -232,20 +245,24 @@ class TestAdminService:
status="pending",
source_url="https://pending.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id # Fixed: Added missing user_id
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add(other_job)
db.commit()
# Filter by the test marketplace job's status
result = self.service.get_marketplace_import_jobs(db, status=test_marketplace_job.status)
result = self.service.get_marketplace_import_jobs(
db, status=test_marketplace_job.status
)
assert len(result) >= 1
# All results should match the status filter
for job in result:
assert job.status == test_marketplace_job.status
def test_get_marketplace_import_jobs_with_multiple_filters(self, db, test_marketplace_job, test_shop, test_user):
def test_get_marketplace_import_jobs_with_multiple_filters(
self, db, test_marketplace_job, test_shop, test_user
):
"""Test getting marketplace import jobs with multiple filters"""
# Create jobs that don't match all filters
non_matching_job1 = MarketplaceImportJob(
@@ -254,7 +271,7 @@ class TestAdminService:
status=test_marketplace_job.status,
source_url="https://non-matching1.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id # Fixed: Added missing user_id
user_id=test_user.id, # Fixed: Added missing user_id
)
non_matching_job2 = MarketplaceImportJob(
marketplace=test_marketplace_job.marketplace,
@@ -262,7 +279,7 @@ class TestAdminService:
status=test_marketplace_job.status,
source_url="https://non-matching2.example.com/import",
shop_id=test_shop.id,
user_id=test_user.id # Fixed: Added missing user_id
user_id=test_user.id, # Fixed: Added missing user_id
)
db.add_all([non_matching_job1, non_matching_job2])
db.commit()
@@ -272,12 +289,14 @@ class TestAdminService:
db,
marketplace=test_marketplace_job.marketplace,
shop_name=test_marketplace_job.shop_name,
status=test_marketplace_job.status
status=test_marketplace_job.status,
)
assert len(result) >= 1
# Find our test job in the results
test_job = next((job for job in result if job.job_id == test_marketplace_job.id), None)
test_job = next(
(job for job in result if job.job_id == test_marketplace_job.id), None
)
assert test_job is not None
assert test_job.marketplace == test_marketplace_job.marketplace
assert test_job.shop_name == test_marketplace_job.shop_name
@@ -297,7 +316,7 @@ class TestAdminService:
updated_count=None,
total_processed=None,
error_count=None,
error_message=None
error_message=None,
)
db.add(job)
db.commit()

View File

@@ -6,11 +6,14 @@ from fastapi import HTTPException
class TestAuthenticationAPI:
def test_register_user_success(self, client, db):
"""Test successful user registration"""
response = client.post("/api/v1/auth/register", json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123"
})
response = client.post(
"/api/v1/auth/register",
json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 200
data = response.json()
@@ -22,32 +25,38 @@ class TestAuthenticationAPI:
def test_register_user_duplicate_email(self, client, test_user):
"""Test registration with duplicate email"""
response = client.post("/api/v1/auth/register", json={
"email": test_user.email, # Same as test_user
"username": "newuser",
"password": "securepass123"
})
response = client.post(
"/api/v1/auth/register",
json={
"email": test_user.email, # Same as test_user
"username": "newuser",
"password": "securepass123",
},
)
assert response.status_code == 400
assert "Email already registered" in response.json()["detail"]
def test_register_user_duplicate_username(self, client, test_user):
"""Test registration with duplicate username"""
response = client.post("/api/v1/auth/register", json={
"email": "new@example.com",
"username": test_user.username, # Same as test_user
"password": "securepass123"
})
response = client.post(
"/api/v1/auth/register",
json={
"email": "new@example.com",
"username": test_user.username, # Same as test_user
"password": "securepass123",
},
)
assert response.status_code == 400
assert "Username already taken" in response.json()["detail"]
def test_login_success(self, client, test_user):
"""Test successful login"""
response = client.post("/api/v1/auth/login", json={
"username": test_user.username,
"password": "testpass123"
})
response = client.post(
"/api/v1/auth/login",
json={"username": test_user.username, "password": "testpass123"},
)
assert response.status_code == 200
data = response.json()
@@ -58,20 +67,20 @@ class TestAuthenticationAPI:
def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password"""
response = client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "wrongpassword"
})
response = client.post(
"/api/v1/auth/login",
json={"username": "testuser", "password": "wrongpassword"},
)
assert response.status_code == 401
assert "Incorrect username or password" in response.json()["detail"]
def test_login_nonexistent_user(self, client, db): # Added db fixture
"""Test login with nonexistent user"""
response = client.post("/api/v1/auth/login", json={
"username": "nonexistent",
"password": "password123"
})
response = client.post(
"/api/v1/auth/login",
json={"username": "nonexistent", "password": "password123"},
)
assert response.status_code == 401

View File

@@ -3,8 +3,8 @@ import pytest
from fastapi import HTTPException
from app.services.auth_service import AuthService
from models.api_models import UserLogin, UserRegister
from models.database_models import User
from models.api_models import UserRegister, UserLogin
class TestAuthService:
@@ -17,9 +17,7 @@ class TestAuthService:
def test_register_user_success(self, db):
"""Test successful user registration"""
user_data = UserRegister(
email="newuser@example.com",
username="newuser123",
password="securepass123"
email="newuser@example.com", username="newuser123", password="securepass123"
)
user = self.service.register_user(db, user_data)
@@ -36,7 +34,7 @@ class TestAuthService:
user_data = UserRegister(
email=test_user.email, # Use existing email
username="differentuser",
password="securepass123"
password="securepass123",
)
with pytest.raises(HTTPException) as exc_info:
@@ -50,7 +48,7 @@ class TestAuthService:
user_data = UserRegister(
email="different@example.com",
username=test_user.username, # Use existing username
password="securepass123"
password="securepass123",
)
with pytest.raises(HTTPException) as exc_info:
@@ -62,8 +60,7 @@ class TestAuthService:
def test_login_user_success(self, db, test_user):
"""Test successful user login"""
user_credentials = UserLogin(
username=test_user.username,
password="testpass123"
username=test_user.username, password="testpass123"
)
result = self.service.login_user(db, user_credentials)
@@ -78,10 +75,7 @@ class TestAuthService:
def test_login_user_wrong_username(self, db):
"""Test login fails with wrong username"""
user_credentials = UserLogin(
username="nonexistentuser",
password="testpass123"
)
user_credentials = UserLogin(username="nonexistentuser", password="testpass123")
with pytest.raises(HTTPException) as exc_info:
self.service.login_user(db, user_credentials)
@@ -92,8 +86,7 @@ class TestAuthService:
def test_login_user_wrong_password(self, db, test_user):
"""Test login fails with wrong password"""
user_credentials = UserLogin(
username=test_user.username,
password="wrongpassword"
username=test_user.username, password="wrongpassword"
)
with pytest.raises(HTTPException) as exc_info:
@@ -109,8 +102,7 @@ class TestAuthService:
db.commit()
user_credentials = UserLogin(
username=test_user.username,
password="testpass123"
username=test_user.username, password="testpass123"
)
with pytest.raises(HTTPException) as exc_info:

View File

@@ -1,9 +1,11 @@
# tests/test_background_tasks.py
from datetime import datetime
from unittest.mock import AsyncMock, patch
import pytest
from unittest.mock import patch, AsyncMock
from app.tasks.background_tasks import process_marketplace_import
from models.database_models import MarketplaceImportJob
from datetime import datetime
class TestBackgroundTasks:
@@ -17,7 +19,7 @@ class TestBackgroundTasks:
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id
user_id=test_user.id,
)
db.add(job)
db.commit()
@@ -27,29 +29,30 @@ class TestBackgroundTasks:
job_id = job.id
# Mock CSV processor and prevent session from closing
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor, \
patch('app.tasks.background_tasks.SessionLocal', return_value=db):
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0
})
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0,
}
)
# Run background task
await process_marketplace_import(
job_id,
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
job_id, "http://example.com/test.csv", "TestMarket", "TESTSHOP", 1000
)
# Re-query the job using the stored ID
updated_job = db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.id == job_id
).first()
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "completed"
@@ -70,7 +73,7 @@ class TestBackgroundTasks:
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id
user_id=test_user.id,
)
db.add(job)
db.commit()
@@ -80,8 +83,9 @@ class TestBackgroundTasks:
job_id = job.id
# Mock CSV processor to raise exception
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor, \
patch('app.tasks.background_tasks.SessionLocal', return_value=db):
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
@@ -96,7 +100,7 @@ class TestBackgroundTasks:
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
1000,
)
except Exception:
# The background task should handle exceptions internally
@@ -104,9 +108,11 @@ class TestBackgroundTasks:
pass
# Re-query the job using the stored ID
updated_job = db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.id == job_id
).first()
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "failed"
@@ -115,15 +121,18 @@ class TestBackgroundTasks:
@pytest.mark.asyncio
async def test_marketplace_import_job_not_found(self, db):
"""Test handling when import job doesn't exist"""
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor, \
patch('app.tasks.background_tasks.SessionLocal', return_value=db):
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0
})
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0,
}
)
# Run background task with non-existent job ID
await process_marketplace_import(
@@ -131,7 +140,7 @@ class TestBackgroundTasks:
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
1000,
)
# Should not raise an exception, just log and return
@@ -148,7 +157,7 @@ class TestBackgroundTasks:
shop_name="TESTSHOP",
marketplace="TestMarket",
shop_id=test_shop.id,
user_id=test_user.id
user_id=test_user.id,
)
db.add(job)
db.commit()
@@ -158,29 +167,30 @@ class TestBackgroundTasks:
job_id = job.id
# Mock CSV processor with some errors
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor, \
patch('app.tasks.background_tasks.SessionLocal', return_value=db):
with patch("app.tasks.background_tasks.CSVProcessor") as mock_processor, patch(
"app.tasks.background_tasks.SessionLocal", return_value=db
):
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(return_value={
"imported": 8,
"updated": 5,
"total_processed": 15,
"errors": 2
})
mock_instance.process_marketplace_csv_from_url = AsyncMock(
return_value={
"imported": 8,
"updated": 5,
"total_processed": 15,
"errors": 2,
}
)
# Run background task
await process_marketplace_import(
job_id,
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
job_id, "http://example.com/test.csv", "TestMarket", "TESTSHOP", 1000
)
# Re-query the job using the stored ID
updated_job = db.query(MarketplaceImportJob).filter(
MarketplaceImportJob.id == job_id
).first()
updated_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert updated_job is not None
assert updated_job.status == "completed_with_errors"

View File

@@ -1,9 +1,11 @@
# tests/test_csv_processor.py
from unittest.mock import Mock, patch
import pandas as pd
import pytest
import requests
import requests.exceptions
from unittest.mock import Mock, patch
import pandas as pd
from utils.csv_processor import CSVProcessor
@@ -11,7 +13,7 @@ class TestCSVProcessor:
def setup_method(self):
self.processor = CSVProcessor()
@patch('requests.get')
@patch("requests.get")
def test_download_csv_encoding_fallback(self, mock_get):
"""Test CSV download with encoding fallback"""
# Create content with special characters that would fail UTF-8 if not properly encoded
@@ -20,7 +22,7 @@ class TestCSVProcessor:
mock_response = Mock()
mock_response.status_code = 200
# Use latin-1 encoding which your method should try
mock_response.content = special_content.encode('latin-1')
mock_response.content = special_content.encode("latin-1")
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
@@ -30,14 +32,16 @@ class TestCSVProcessor:
assert isinstance(csv_content, str)
assert "Café Product" in csv_content
@patch('requests.get')
@patch("requests.get")
def test_download_csv_encoding_ignore_fallback(self, mock_get):
"""Test CSV download falls back to UTF-8 with error ignoring"""
# Create problematic bytes that would fail most encoding attempts
mock_response = Mock()
mock_response.status_code = 200
# Create bytes that will fail most encodings
mock_response.content = b"product_id,title,price\nTEST001,\xff\xfe Product,10.99"
mock_response.content = (
b"product_id,title,price\nTEST001,\xff\xfe Product,10.99"
)
mock_response.raise_for_status.return_value = None
mock_get.return_value = mock_response
@@ -49,7 +53,7 @@ class TestCSVProcessor:
assert "product_id,title,price" in csv_content
assert "TEST001" in csv_content
@patch('requests.get')
@patch("requests.get")
def test_download_csv_request_exception(self, mock_get):
"""Test CSV download with request exception"""
mock_get.side_effect = requests.exceptions.RequestException("Connection error")
@@ -57,18 +61,20 @@ class TestCSVProcessor:
with pytest.raises(requests.exceptions.RequestException):
self.processor.download_csv("http://example.com/test.csv")
@patch('requests.get')
@patch("requests.get")
def test_download_csv_http_error(self, mock_get):
"""Test CSV download with HTTP error"""
mock_response = Mock()
mock_response.status_code = 404
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError("404 Not Found")
mock_response.raise_for_status.side_effect = requests.exceptions.HTTPError(
"404 Not Found"
)
mock_get.return_value = mock_response
with pytest.raises(requests.exceptions.HTTPError):
self.processor.download_csv("http://example.com/nonexistent.csv")
@patch('requests.get')
@patch("requests.get")
def test_download_csv_failure(self, mock_get):
"""Test CSV download failure"""
# Mock failed HTTP response
@@ -95,26 +101,24 @@ TEST002,Test Product 2,15.99,TestMarket"""
@pytest.mark.asyncio
async def test_process_marketplace_csv_from_url(self, db):
"""Test complete marketplace CSV processing"""
with patch.object(self.processor, 'download_csv') as mock_download, \
patch.object(self.processor, 'parse_csv') as mock_parse:
with patch.object(
self.processor, "download_csv"
) as mock_download, patch.object(self.processor, "parse_csv") as mock_parse:
# Mock successful download and parsing
mock_download.return_value = "csv_content"
mock_df = pd.DataFrame({
"product_id": ["TEST001", "TEST002"],
"title": ["Product 1", "Product 2"],
"price": ["10.99", "15.99"],
"marketplace": ["TestMarket", "TestMarket"],
"shop_name": ["TestShop", "TestShop"]
})
mock_df = pd.DataFrame(
{
"product_id": ["TEST001", "TEST002"],
"title": ["Product 1", "Product 2"],
"price": ["10.99", "15.99"],
"marketplace": ["TestMarket", "TestMarket"],
"shop_name": ["TestShop", "TestShop"],
}
)
mock_parse.return_value = mock_df
result = await self.processor.process_marketplace_csv_from_url(
"http://example.com/test.csv",
"TestMarket",
"TestShop",
1000,
db
"http://example.com/test.csv", "TestMarket", "TestShop", 1000, db
)
assert "imported" in result

View File

@@ -1,5 +1,6 @@
# tests/test_data_validation.py
import pytest
from utils.data_processing import GTINProcessor, PriceProcessor

View File

@@ -1,7 +1,8 @@
# tests/test_database.py
import pytest
from sqlalchemy import text
from models.database_models import User, Product, Stock, Shop
from models.database_models import Product, Shop, Stock, User
class TestDatabaseModels:
@@ -12,7 +13,7 @@ class TestDatabaseModels:
username="dbtest",
hashed_password="hashed_password_123",
role="user",
is_active=True
is_active=True,
)
db.add(user)
@@ -36,7 +37,7 @@ class TestDatabaseModels:
gtin="1234567890123",
availability="in stock",
marketplace="TestDB",
shop_name="DBTestShop"
shop_name="DBTestShop",
)
db.add(product)
@@ -49,11 +50,7 @@ class TestDatabaseModels:
def test_stock_model(self, db):
"""Test Stock model creation"""
stock = Stock(
gtin="1234567890123",
location="DB_WAREHOUSE",
quantity=150
)
stock = Stock(gtin="1234567890123", location="DB_WAREHOUSE", quantity=150)
db.add(stock)
db.commit()
@@ -72,7 +69,7 @@ class TestDatabaseModels:
description="Testing shop model",
owner_id=test_user.id,
is_active=True,
is_verified=False
is_verified=False,
)
db.add(shop)

View File

@@ -5,24 +5,25 @@ import pytest
class TestErrorHandling:
def test_invalid_json(self, client, auth_headers):
"""Test handling of invalid JSON"""
response = client.post("/api/v1/product",
headers=auth_headers,
content="invalid json")
response = client.post(
"/api/v1/product", headers=auth_headers, content="invalid json"
)
assert response.status_code == 422 # Validation error
def test_missing_required_fields(self, client, auth_headers):
"""Test handling of missing required fields"""
response = client.post("/api/v1/product",
headers=auth_headers,
json={"title": "Test"}) # Missing product_id
response = client.post(
"/api/v1/product", headers=auth_headers, json={"title": "Test"}
) # Missing product_id
assert response.status_code == 422
def test_invalid_authentication(self, client):
"""Test handling of invalid authentication"""
response = client.get("/api/v1/product",
headers={"Authorization": "Bearer invalid_token"})
response = client.get(
"/api/v1/product", headers={"Authorization": "Bearer invalid_token"}
)
assert response.status_code == 401 # Token is not valid
@@ -38,8 +39,10 @@ class TestErrorHandling:
"""Test handling of duplicate resource creation"""
product_data = {
"product_id": test_product.product_id, # Duplicate ID
"title": "Another Product"
"title": "Another Product",
}
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 400

View File

@@ -1,8 +1,9 @@
# tests/test_export.py
import pytest
import csv
from io import StringIO
import pytest
from models.database_models import Product
@@ -15,7 +16,7 @@ class TestExportFunctionality:
assert response.headers["content-type"] == "text/csv; charset=utf-8"
# Parse CSV content
csv_content = response.content.decode('utf-8')
csv_content = response.content.decode("utf-8")
csv_reader = csv.reader(StringIO(csv_content))
# Check header row
@@ -35,10 +36,12 @@ class TestExportFunctionality:
db.add_all(products)
db.commit()
response = client.get("/api/v1/export-csv?marketplace=Amazon", headers=auth_headers)
response = client.get(
"/api/v1/export-csv?marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
csv_content = response.content.decode('utf-8')
csv_content = response.content.decode("utf-8")
assert "EXP1" in csv_content
assert "EXP2" not in csv_content # Should be filtered out
@@ -50,7 +53,7 @@ class TestExportFunctionality:
product = Product(
product_id=f"PERF{i:04d}",
title=f"Performance Product {i}",
marketplace="Performance"
marketplace="Performance",
)
products.append(product)
@@ -58,10 +61,10 @@ class TestExportFunctionality:
db.commit()
import time
start_time = time.time()
response = client.get("/api/v1/export-csv", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert end_time - start_time < 10.0 # Should complete within 10 seconds

View File

@@ -1,5 +1,6 @@
# tests/test_filtering.py
import pytest
from models.database_models import Product
@@ -39,7 +40,9 @@ class TestFiltering:
db.add_all(products)
db.commit()
response = client.get("/api/v1/product?marketplace=Amazon", headers=auth_headers)
response = client.get(
"/api/v1/product?marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
@@ -47,9 +50,17 @@ class TestFiltering:
def test_product_search_filter(self, client, auth_headers, db):
"""Test searching products by text"""
products = [
Product(product_id="SEARCH1", title="Apple iPhone", description="Smartphone"),
Product(product_id="SEARCH2", title="Samsung Galaxy", description="Android phone"),
Product(product_id="SEARCH3", title="iPad Tablet", description="Apple tablet"),
Product(
product_id="SEARCH1", title="Apple iPhone", description="Smartphone"
),
Product(
product_id="SEARCH2",
title="Samsung Galaxy",
description="Android phone",
),
Product(
product_id="SEARCH3", title="iPad Tablet", description="Apple tablet"
),
]
db.add_all(products)
@@ -70,16 +81,33 @@ class TestFiltering:
def test_combined_filters(self, client, auth_headers, db):
"""Test combining multiple filters"""
products = [
Product(product_id="COMBO1", title="Apple iPhone", brand="Apple", marketplace="Amazon"),
Product(product_id="COMBO2", title="Apple iPad", brand="Apple", marketplace="eBay"),
Product(product_id="COMBO3", title="Samsung Phone", brand="Samsung", marketplace="Amazon"),
Product(
product_id="COMBO1",
title="Apple iPhone",
brand="Apple",
marketplace="Amazon",
),
Product(
product_id="COMBO2",
title="Apple iPad",
brand="Apple",
marketplace="eBay",
),
Product(
product_id="COMBO3",
title="Samsung Phone",
brand="Samsung",
marketplace="Amazon",
),
]
db.add_all(products)
db.commit()
# Filter by brand AND marketplace
response = client.get("/api/v1/product?brand=Apple&marketplace=Amazon", headers=auth_headers)
response = client.get(
"/api/v1/product?brand=Apple&marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1 # Only iPhone matches both

View File

@@ -14,10 +14,12 @@ class TestIntegrationFlows:
"brand": "FlowBrand",
"gtin": "1111222233334",
"availability": "in stock",
"marketplace": "TestFlow"
"marketplace": "TestFlow",
}
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 200
product = response.json()
@@ -25,26 +27,33 @@ class TestIntegrationFlows:
stock_data = {
"gtin": product["gtin"],
"location": "MAIN_WAREHOUSE",
"quantity": 50
"quantity": 50,
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
assert response.status_code == 200
# 3. Get product with stock info
response = client.get(f"/api/v1/product/{product['product_id']}", headers=auth_headers)
response = client.get(
f"/api/v1/product/{product['product_id']}", headers=auth_headers
)
assert response.status_code == 200
product_detail = response.json()
assert product_detail["stock_info"]["total_quantity"] == 50
# 4. Update product
update_data = {"title": "Updated Integration Test Product"}
response = client.put(f"/api/v1/product/{product['product_id']}",
headers=auth_headers, json=update_data)
response = client.put(
f"/api/v1/product/{product['product_id']}",
headers=auth_headers,
json=update_data,
)
assert response.status_code == 200
# 5. Search for product
response = client.get("/api/v1/product?search=Updated Integration", headers=auth_headers)
response = client.get(
"/api/v1/product?search=Updated Integration", headers=auth_headers
)
assert response.status_code == 200
assert response.json()["total"] == 1
@@ -54,7 +63,7 @@ class TestIntegrationFlows:
shop_data = {
"shop_code": "FLOWSHOP",
"shop_name": "Integration Flow Shop",
"description": "Test shop for integration"
"description": "Test shop for integration",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
@@ -66,10 +75,12 @@ class TestIntegrationFlows:
"product_id": "SHOPFLOW001",
"title": "Shop Flow Product",
"price": "15.99",
"marketplace": "ShopFlow"
"marketplace": "ShopFlow",
}
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 200
product = response.json()
@@ -86,28 +97,28 @@ class TestIntegrationFlows:
location = "TEST_WAREHOUSE"
# 1. Set initial stock
response = client.post("/api/v1/stock", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 100
})
response = client.post(
"/api/v1/stock",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 100},
)
assert response.status_code == 200
# 2. Add more stock
response = client.post("/api/v1/stock/add", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 25
})
response = client.post(
"/api/v1/stock/add",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 25},
)
assert response.status_code == 200
assert response.json()["quantity"] == 125
# 3. Remove some stock
response = client.post("/api/v1/stock/remove", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 30
})
response = client.post(
"/api/v1/stock/remove",
headers=auth_headers,
json={"gtin": gtin, "location": location, "quantity": 30},
)
assert response.status_code == 200
assert response.json()["quantity"] == 95

View File

@@ -1,6 +1,7 @@
# tests/test_marketplace.py
from unittest.mock import AsyncMock, patch
import pytest
from unittest.mock import patch, AsyncMock
class TestMarketplaceAPI:
@@ -10,11 +11,12 @@ class TestMarketplaceAPI:
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code
"shop_code": test_shop.shop_code,
}
response = client.post("/api/v1/marketplace/import-product",
headers=auth_headers, json=import_data)
response = client.post(
"/api/v1/marketplace/import-product", headers=auth_headers, json=import_data
)
assert response.status_code == 200
data = response.json()
@@ -29,11 +31,12 @@ class TestMarketplaceAPI:
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": "NONEXISTENT"
"shop_code": "NONEXISTENT",
}
response = client.post("/api/v1/marketplace/import-product",
headers=auth_headers, json=import_data)
response = client.post(
"/api/v1/marketplace/import-product", headers=auth_headers, json=import_data
)
assert response.status_code == 404
assert "Shop not found" in response.json()["detail"]
@@ -49,4 +52,3 @@ class TestMarketplaceAPI:
"""Test that marketplace endpoints require authentication"""
response = client.get("/api/v1/marketplace/import-jobs")
assert response.status_code == 401 # No authorization header

View File

@@ -1,10 +1,12 @@
# tests/test_marketplace_service.py
import pytest
import uuid
from datetime import datetime
import pytest
from app.services.marketplace_service import MarketplaceService
from models.api_models import MarketplaceImportRequest
from models.database_models import MarketplaceImportJob, Shop, User
from datetime import datetime
class TestMarketplaceService:
@@ -22,7 +24,9 @@ class TestMarketplaceService:
assert result.shop_code == test_shop.shop_code
assert result.owner_id == test_user.id
def test_validate_shop_access_admin_can_access_any_shop(self, db, test_shop, test_admin):
def test_validate_shop_access_admin_can_access_any_shop(
self, db, test_shop, test_admin
):
"""Test that admin users can access any shop"""
result = self.service.validate_shop_access(db, test_shop.shop_code, test_admin)
@@ -33,7 +37,9 @@ class TestMarketplaceService:
with pytest.raises(ValueError, match="Shop not found"):
self.service.validate_shop_access(db, "NONEXISTENT", test_user)
def test_validate_shop_access_permission_denied(self, db, test_shop, test_user, other_user):
def test_validate_shop_access_permission_denied(
self, db, test_shop, test_user, other_user
):
"""Test shop access validation when user doesn't own the shop"""
# Set the shop owner to a different user
test_shop.owner_id = other_user.id
@@ -52,7 +58,7 @@ class TestMarketplaceService:
url="https://example.com/products.csv",
marketplace="Amazon",
shop_code=test_shop.shop_code,
batch_size=1000
batch_size=1000,
)
result = self.service.create_import_job(db, request, test_user)
@@ -60,7 +66,7 @@ class TestMarketplaceService:
assert result.marketplace == "Amazon"
# Check the correct field based on your model
assert result.shop_id == test_shop.id # Changed from shop_code to shop_id
assert result.user_id == test_user.id if hasattr(result, 'user_id') else True
assert result.user_id == test_user.id if hasattr(result, "user_id") else True
assert result.status == "pending"
assert result.source_url == "https://example.com/products.csv"
@@ -70,7 +76,7 @@ class TestMarketplaceService:
url="https://example.com/products.csv",
marketplace="Amazon",
shop_code="INVALID_SHOP",
batch_size=1000
batch_size=1000,
)
with pytest.raises(ValueError, match="Shop not found"):
@@ -78,16 +84,22 @@ class TestMarketplaceService:
def test_get_import_job_by_id_success(self, db, test_marketplace_job, test_user):
"""Test getting import job by ID for job owner"""
result = self.service.get_import_job_by_id(db, test_marketplace_job.id, test_user)
result = self.service.get_import_job_by_id(
db, test_marketplace_job.id, test_user
)
assert result.id == test_marketplace_job.id
# Check user_id if the field exists
if hasattr(result, 'user_id'):
if hasattr(result, "user_id"):
assert result.user_id == test_user.id
def test_get_import_job_by_id_admin_access(self, db, test_marketplace_job, test_admin):
def test_get_import_job_by_id_admin_access(
self, db, test_marketplace_job, test_admin
):
"""Test that admin can access any import job"""
result = self.service.get_import_job_by_id(db, test_marketplace_job.id, test_admin)
result = self.service.get_import_job_by_id(
db, test_marketplace_job.id, test_admin
)
assert result.id == test_marketplace_job.id
@@ -96,7 +108,9 @@ class TestMarketplaceService:
with pytest.raises(ValueError, match="Marketplace import job not found"):
self.service.get_import_job_by_id(db, 99999, test_user)
def test_get_import_job_by_id_access_denied(self, db, test_marketplace_job, other_user):
def test_get_import_job_by_id_access_denied(
self, db, test_marketplace_job, other_user
):
"""Test access denied when user doesn't own the job"""
with pytest.raises(PermissionError, match="Access denied to this import job"):
self.service.get_import_job_by_id(db, test_marketplace_job.id, other_user)
@@ -108,7 +122,7 @@ class TestMarketplaceService:
assert len(jobs) >= 1
assert any(job.id == test_marketplace_job.id for job in jobs)
# Check user_id if the field exists
if hasattr(test_marketplace_job, 'user_id'):
if hasattr(test_marketplace_job, "user_id"):
assert test_marketplace_job.user_id == test_user.id
def test_get_import_jobs_admin_sees_all(self, db, test_marketplace_job, test_admin):
@@ -118,7 +132,9 @@ class TestMarketplaceService:
assert len(jobs) >= 1
assert any(job.id == test_marketplace_job.id for job in jobs)
def test_get_import_jobs_with_marketplace_filter(self, db, test_marketplace_job, test_user):
def test_get_import_jobs_with_marketplace_filter(
self, db, test_marketplace_job, test_user
):
"""Test getting import jobs with marketplace filter"""
jobs = self.service.get_import_jobs(
db, test_user, marketplace=test_marketplace_job.marketplace
@@ -143,7 +159,7 @@ class TestMarketplaceService:
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0
error_count=0,
)
db.add(job)
db.commit()
@@ -159,7 +175,7 @@ class TestMarketplaceService:
test_marketplace_job.id,
"completed",
imported_count=100,
total_processed=100
total_processed=100,
)
assert result.status == "completed"
@@ -211,7 +227,7 @@ class TestMarketplaceService:
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0
error_count=0,
)
db.add(job)
db.commit()
@@ -222,13 +238,17 @@ class TestMarketplaceService:
assert result.status == "cancelled"
assert result.completed_at is not None
def test_cancel_import_job_invalid_status(self, db, test_marketplace_job, test_user):
def test_cancel_import_job_invalid_status(
self, db, test_marketplace_job, test_user
):
"""Test cancelling a job that can't be cancelled"""
# Set job status to completed
test_marketplace_job.status = "completed"
db.commit()
with pytest.raises(ValueError, match="Cannot cancel job with status: completed"):
with pytest.raises(
ValueError, match="Cannot cancel job with status: completed"
):
self.service.cancel_import_job(db, test_marketplace_job.id, test_user)
def test_delete_import_job_success(self, db, test_user, test_shop):
@@ -246,7 +266,7 @@ class TestMarketplaceService:
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0
error_count=0,
)
db.add(job)
db.commit()
@@ -258,7 +278,11 @@ class TestMarketplaceService:
assert result is True
# Verify the job is actually deleted
deleted_job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
deleted_job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
assert deleted_job is None
def test_delete_import_job_invalid_status(self, db, test_user, test_shop):
@@ -276,7 +300,7 @@ class TestMarketplaceService:
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0
error_count=0,
)
db.add(job)
db.commit()

View File

@@ -1,8 +1,10 @@
# tests/test_middleware.py
import pytest
from unittest.mock import Mock, patch
from middleware.rate_limiter import RateLimiter
import pytest
from middleware.auth import AuthManager
from middleware.rate_limiter import RateLimiter
class TestRateLimiter:
@@ -12,11 +14,17 @@ class TestRateLimiter:
client_id = "test_client"
# Should allow first request
assert limiter.allow_request(client_id, max_requests=10, window_seconds=3600) is True
assert (
limiter.allow_request(client_id, max_requests=10, window_seconds=3600)
is True
)
# Should allow subsequent requests within limit
for _ in range(5):
assert limiter.allow_request(client_id, max_requests=10, window_seconds=3600) is True
assert (
limiter.allow_request(client_id, max_requests=10, window_seconds=3600)
is True
)
def test_rate_limiter_blocks_excess_requests(self):
"""Test rate limiter blocks requests exceeding limit"""

View File

@@ -1,5 +1,6 @@
# tests/test_pagination.py
import pytest
from models.database_models import Product
@@ -12,7 +13,7 @@ class TestPagination:
product = Product(
product_id=f"PAGE{i:03d}",
title=f"Pagination Test Product {i}",
marketplace="PaginationTest"
marketplace="PaginationTest",
)
products.append(product)

View File

@@ -1,7 +1,8 @@
# tests/test_performance.py
import pytest
import time
import pytest
from models.database_models import Product
@@ -15,7 +16,7 @@ class TestPerformance:
product_id=f"PERF{i:03d}",
title=f"Performance Test Product {i}",
price=f"{i}.99",
marketplace="Performance"
marketplace="Performance",
)
products.append(product)
@@ -41,7 +42,7 @@ class TestPerformance:
title=f"Searchable Product {i}",
description=f"This is a searchable product number {i}",
brand="SearchBrand",
marketplace="SearchMarket"
marketplace="SearchMarket",
)
products.append(product)

View File

@@ -31,7 +31,9 @@ class TestProductsAPI:
assert response.json()["total"] == 1
# Test marketplace filter
response = client.get("/api/v1/product?marketplace=Letzshop", headers=auth_headers)
response = client.get(
"/api/v1/product?marketplace=Letzshop", headers=auth_headers
)
assert response.status_code == 200
assert response.json()["total"] == 1
@@ -50,10 +52,12 @@ class TestProductsAPI:
"brand": "NewBrand",
"gtin": "9876543210987",
"availability": "in stock",
"marketplace": "Amazon"
"marketplace": "Amazon",
}
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
assert response.status_code == 200
data = response.json()
@@ -71,10 +75,12 @@ class TestProductsAPI:
"brand": "NewBrand",
"gtin": "9876543210987",
"availability": "in stock",
"marketplace": "Amazon"
"marketplace": "Amazon",
}
response = client.post("/api/v1/product", headers=auth_headers, json=product_data)
response = client.post(
"/api/v1/product", headers=auth_headers, json=product_data
)
# Debug output
print(f"Status Code: {response.status_code}")
@@ -89,7 +95,9 @@ class TestProductsAPI:
def test_get_product_by_id(self, client, auth_headers, test_product):
"""Test getting specific product"""
response = client.get(f"/api/v1/product/{test_product.product_id}", headers=auth_headers)
response = client.get(
f"/api/v1/product/{test_product.product_id}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()
@@ -104,15 +112,12 @@ class TestProductsAPI:
def test_update_product(self, client, auth_headers, test_product):
"""Test updating product"""
update_data = {
"title": "Updated Product Title",
"price": "25.99"
}
update_data = {"title": "Updated Product Title", "price": "25.99"}
response = client.put(
f"/api/v1/product/{test_product.product_id}",
headers=auth_headers,
json=update_data
json=update_data,
)
assert response.status_code == 200
@@ -123,8 +128,7 @@ class TestProductsAPI:
def test_delete_product(self, client, auth_headers, test_product):
"""Test deleting product"""
response = client.delete(
f"/api/v1/product/{test_product.product_id}",
headers=auth_headers
f"/api/v1/product/{test_product.product_id}", headers=auth_headers
)
assert response.status_code == 200

View File

@@ -1,5 +1,6 @@
# tests/test_product_service.py
import pytest
from app.services.product_service import ProductService
from models.api_models import ProductCreate
from models.database_models import Product
@@ -16,7 +17,7 @@ class TestProductService:
title="Service Test Product",
gtin="1234567890123",
price="19.99",
marketplace="TestMarket"
marketplace="TestMarket",
)
product = self.service.create_product(db, product_data)
@@ -31,7 +32,7 @@ class TestProductService:
product_id="SVC002",
title="Service Test Product",
gtin="invalid_gtin",
price="19.99"
price="19.99",
)
with pytest.raises(ValueError, match="Invalid GTIN format"):
@@ -39,10 +40,7 @@ class TestProductService:
def test_get_products_with_filters(self, db, test_product):
"""Test getting products with various filters"""
products, total = self.service.get_products_with_filters(
db,
brand="TestBrand"
)
products, total = self.service.get_products_with_filters(db, brand="TestBrand")
assert total == 1
assert len(products) == 1
@@ -51,10 +49,8 @@ class TestProductService:
def test_get_products_with_search(self, db, test_product):
"""Test getting products with search"""
products, total = self.service.get_products_with_filters(
db,
search="Test Product"
db, search="Test Product"
)
assert total == 1
assert len(products) == 1

View File

@@ -1,7 +1,8 @@
# tests/test_security.py
from unittest.mock import patch
import pytest
from fastapi import HTTPException
from unittest.mock import patch
class TestSecurity:
@@ -10,7 +11,9 @@ class TestSecurity:
response = client.get("/api/v1/debug-bearer")
print(f"Direct Bearer - Status: {response.status_code}")
print(f"Direct Bearer - Response: {response.json() if response.content else 'No content'}")
print(
f"Direct Bearer - Response: {response.json() if response.content else 'No content'}"
)
def test_debug_dependencies(self, client):
"""Debug the dependency chain step by step"""
@@ -24,7 +27,9 @@ class TestSecurity:
print(f"Admin endpoint - Raw: {response.content}")
# Test 2: Try a regular endpoint that uses get_current_user
response2 = client.get("/api/v1/product") # or any endpoint with get_current_user
response2 = client.get(
"/api/v1/product"
) # or any endpoint with get_current_user
print(f"Regular endpoint - Status: {response2.status_code}")
try:
print(f"Regular endpoint - Response: {response2.json()}")
@@ -35,7 +40,7 @@ class TestSecurity:
"""Debug test to see all available routes"""
print("\n=== All Available Routes ===")
for route in client.app.routes:
if hasattr(route, 'path') and hasattr(route, 'methods'):
if hasattr(route, "path") and hasattr(route, "methods"):
print(f"{list(route.methods)} {route.path}")
print("\n=== Testing Product Endpoint Variations ===")
@@ -59,7 +64,7 @@ class TestSecurity:
"/api/v1/product",
"/api/v1/shop",
"/api/v1/stats",
"/api/v1/stock"
"/api/v1/stock",
]
for endpoint in protected_endpoints:
@@ -76,7 +81,9 @@ class TestSecurity:
def test_admin_endpoint_requires_admin_role(self, client, auth_headers):
"""Test that admin endpoints require admin role"""
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert response.status_code == 403 # Token is valid but user does not have access.
assert (
response.status_code == 403
) # Token is valid but user does not have access.
# Regular user should be denied
def test_sql_injection_prevention(self, client, auth_headers):
@@ -84,7 +91,9 @@ class TestSecurity:
# Try SQL injection in search parameter
malicious_search = "'; DROP TABLE products; --"
response = client.get(f"/api/v1/product?search={malicious_search}", headers=auth_headers)
response = client.get(
f"/api/v1/product?search={malicious_search}", headers=auth_headers
)
# Should not crash and should return normal response
assert response.status_code == 200

View File

@@ -8,7 +8,7 @@ class TestShopsAPI:
shop_data = {
"shop_code": "NEWSHOP",
"shop_name": "New Shop",
"description": "A new test shop"
"description": "A new test shop",
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
@@ -23,7 +23,7 @@ class TestShopsAPI:
"""Test creating shop with duplicate code"""
shop_data = {
"shop_code": test_shop.shop_code, # Same as test_shop
"shop_name": test_shop.shop_name
"shop_name": test_shop.shop_name,
}
response = client.post("/api/v1/shop", headers=auth_headers, json=shop_data)
@@ -42,7 +42,9 @@ class TestShopsAPI:
def test_get_shop_by_code(self, client, auth_headers, test_shop):
"""Test getting specific shop"""
response = client.get(f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers)
response = client.get(
f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers
)
assert response.status_code == 200
data = response.json()

View File

@@ -18,7 +18,7 @@ class TestShopService:
shop_data = ShopCreate(
shop_code="NEWSHOP",
shop_name="New Test Shop",
description="A new test shop"
description="A new test shop",
)
shop = self.service.create_shop(db, shop_data, test_user)
@@ -30,10 +30,7 @@ class TestShopService:
def test_create_shop_admin_auto_verify(self, db, test_admin, shop_factory):
"""Test admin creates verified shop automatically"""
shop_data = ShopCreate(
shop_code="ADMINSHOP",
shop_name="Admin Test Shop"
)
shop_data = ShopCreate(shop_code="ADMINSHOP", shop_name="Admin Test Shop")
shop = self.service.create_shop(db, shop_data, test_admin)
@@ -42,8 +39,7 @@ class TestShopService:
def test_create_shop_duplicate_code(self, db, test_user, test_shop):
"""Test shop creation fails with duplicate shop code"""
shop_data = ShopCreate(
shop_code=test_shop.shop_code,
shop_name=test_shop.shop_name
shop_code=test_shop.shop_code, shop_name=test_shop.shop_name
)
with pytest.raises(HTTPException) as exc_info:
@@ -60,9 +56,13 @@ class TestShopService:
assert test_shop.shop_code in shop_codes
assert inactive_shop.shop_code not in shop_codes
def test_get_shops_admin_user(self, db, test_admin, test_shop, inactive_shop, verified_shop):
def test_get_shops_admin_user(
self, db, test_admin, test_shop, inactive_shop, verified_shop
):
"""Test admin user can see all shops with filters"""
shops, total = self.service.get_shops(db, test_admin, active_only=False, verified_only=False)
shops, total = self.service.get_shops(
db, test_admin, active_only=False, verified_only=False
)
shop_codes = [shop.shop_code for shop in shops]
assert test_shop.shop_code in shop_codes
@@ -78,7 +78,9 @@ class TestShopService:
def test_get_shop_by_code_admin_access(self, db, test_admin, test_shop):
"""Test admin can access any shop"""
shop = self.service.get_shop_by_code(db, test_shop.shop_code.lower(), test_admin)
shop = self.service.get_shop_by_code(
db, test_shop.shop_code.lower(), test_admin
)
assert shop is not None
assert shop.id == test_shop.id
@@ -103,10 +105,12 @@ class TestShopService:
product_id=unique_product.product_id,
price="15.99",
is_featured=True,
stock_quantity=5
stock_quantity=5,
)
shop_product = self.service.add_product_to_shop(db, test_shop, shop_product_data)
shop_product = self.service.add_product_to_shop(
db, test_shop, shop_product_data
)
assert shop_product is not None
assert shop_product.shop_id == test_shop.id
@@ -114,10 +118,7 @@ class TestShopService:
def test_add_product_to_shop_product_not_found(self, db, test_shop):
"""Test adding non-existent product to shop fails"""
shop_product_data = ShopProductCreate(
product_id="NONEXISTENT",
price="15.99"
)
shop_product_data = ShopProductCreate(product_id="NONEXISTENT", price="15.99")
with pytest.raises(HTTPException) as exc_info:
self.service.add_product_to_shop(db, test_shop, shop_product_data)
@@ -127,8 +128,7 @@ class TestShopService:
def test_add_product_to_shop_already_exists(self, db, test_shop, shop_product):
"""Test adding product that's already in shop fails"""
shop_product_data = ShopProductCreate(
product_id=shop_product.product.product_id,
price="15.99"
product_id=shop_product.product.product_id, price="15.99"
)
with pytest.raises(HTTPException) as exc_info:
@@ -136,7 +136,9 @@ class TestShopService:
assert exc_info.value.status_code == 400
def test_get_shop_products_owner_access(self, db, test_user, test_shop, shop_product):
def test_get_shop_products_owner_access(
self, db, test_user, test_shop, shop_product
):
"""Test shop owner can get shop products"""
products, total = self.service.get_shop_products(db, test_shop, test_user)

View File

@@ -40,7 +40,7 @@ class TestStatsService:
marketplace="Amazon",
shop_name="AmazonShop",
price="15.99",
currency="EUR"
currency="EUR",
),
Product(
product_id="PROD003",
@@ -50,7 +50,7 @@ class TestStatsService:
marketplace="eBay",
shop_name="eBayShop",
price="25.99",
currency="USD"
currency="USD",
),
Product(
product_id="PROD004",
@@ -60,8 +60,8 @@ class TestStatsService:
marketplace="Letzshop", # Same as test_product
shop_name="DifferentShop",
price="35.99",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(additional_products)
db.commit()
@@ -86,7 +86,7 @@ class TestStatsService:
marketplace=None, # Null marketplace
shop_name=None, # Null shop
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="EMPTY001",
@@ -96,8 +96,8 @@ class TestStatsService:
marketplace="", # Empty marketplace
shop_name="", # Empty shop
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(products_with_nulls)
db.commit()
@@ -122,14 +122,16 @@ class TestStatsService:
# Find our test marketplace in the results
test_marketplace_stat = next(
(stat for stat in stats if stat["marketplace"] == test_product.marketplace),
None
None,
)
assert test_marketplace_stat is not None
assert test_marketplace_stat["total_products"] >= 1
assert test_marketplace_stat["unique_shops"] >= 1
assert test_marketplace_stat["unique_brands"] >= 1
def test_get_marketplace_breakdown_stats_multiple_marketplaces(self, db, test_product):
def test_get_marketplace_breakdown_stats_multiple_marketplaces(
self, db, test_product
):
"""Test marketplace breakdown with multiple marketplaces"""
# Create products for different marketplaces
marketplace_products = [
@@ -140,7 +142,7 @@ class TestStatsService:
marketplace="Amazon",
shop_name="AmazonShop1",
price="20.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="AMAZON002",
@@ -149,7 +151,7 @@ class TestStatsService:
marketplace="Amazon",
shop_name="AmazonShop2",
price="25.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="EBAY001",
@@ -158,8 +160,8 @@ class TestStatsService:
marketplace="eBay",
shop_name="eBayShop",
price="30.00",
currency="USD"
)
currency="USD",
),
]
db.add_all(marketplace_products)
db.commit()
@@ -194,7 +196,7 @@ class TestStatsService:
shop_name="SomeShop",
brand="SomeBrand",
price="10.00",
currency="EUR"
currency="EUR",
)
db.add(null_marketplace_product)
db.commit()
@@ -202,7 +204,9 @@ class TestStatsService:
stats = self.service.get_marketplace_breakdown_stats(db)
# Should not include any stats for null marketplace
marketplace_names = [stat["marketplace"] for stat in stats if stat["marketplace"] is not None]
marketplace_names = [
stat["marketplace"] for stat in stats if stat["marketplace"] is not None
]
assert None not in marketplace_names
def test_get_product_count(self, db, test_product):
@@ -223,7 +227,7 @@ class TestStatsService:
marketplace="Test",
shop_name="TestShop",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="BRAND002",
@@ -232,15 +236,17 @@ class TestStatsService:
marketplace="Test",
shop_name="TestShop",
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(brand_products)
db.commit()
count = self.service.get_unique_brands_count(db)
assert count >= 2 # At least BrandA and BrandB, plus possibly test_product brand
assert (
count >= 2
) # At least BrandA and BrandB, plus possibly test_product brand
assert isinstance(count, int)
def test_get_unique_categories_count(self, db, test_product):
@@ -254,7 +260,7 @@ class TestStatsService:
marketplace="Test",
shop_name="TestShop",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="CAT002",
@@ -263,8 +269,8 @@ class TestStatsService:
marketplace="Test",
shop_name="TestShop",
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(category_products)
db.commit()
@@ -284,7 +290,7 @@ class TestStatsService:
marketplace="Amazon",
shop_name="AmazonShop",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="MARKET002",
@@ -292,8 +298,8 @@ class TestStatsService:
marketplace="eBay",
shop_name="eBayShop",
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(marketplace_products)
db.commit()
@@ -313,7 +319,7 @@ class TestStatsService:
marketplace="Test",
shop_name="ShopA",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="SHOP002",
@@ -321,8 +327,8 @@ class TestStatsService:
marketplace="Test",
shop_name="ShopB",
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(shop_products)
db.commit()
@@ -341,15 +347,15 @@ class TestStatsService:
location="LOCATION2",
quantity=25,
reserved_quantity=5,
shop_id=test_stock.shop_id
shop_id=test_stock.shop_id,
),
Stock(
gtin="1234567890125",
location="LOCATION3",
quantity=0, # Out of stock
reserved_quantity=0,
shop_id=test_stock.shop_id
)
shop_id=test_stock.shop_id,
),
]
db.add_all(additional_stocks)
db.commit()
@@ -372,7 +378,7 @@ class TestStatsService:
marketplace="SpecificMarket",
shop_name="SpecificShop1",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="SPECIFIC002",
@@ -381,7 +387,7 @@ class TestStatsService:
marketplace="SpecificMarket",
shop_name="SpecificShop2",
price="15.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="OTHER001",
@@ -390,8 +396,8 @@ class TestStatsService:
marketplace="OtherMarket",
shop_name="OtherShop",
price="20.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(marketplace_products)
db.commit()
@@ -414,7 +420,7 @@ class TestStatsService:
marketplace="TestMarketplace",
shop_name="TestShop1",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="SHOPTEST002",
@@ -423,8 +429,8 @@ class TestStatsService:
marketplace="TestMarketplace",
shop_name="TestShop2",
price="15.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(marketplace_products)
db.commit()
@@ -445,7 +451,7 @@ class TestStatsService:
marketplace="CountMarketplace",
shop_name="CountShop",
price="10.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="COUNT002",
@@ -453,7 +459,7 @@ class TestStatsService:
marketplace="CountMarketplace",
shop_name="CountShop",
price="15.00",
currency="EUR"
currency="EUR",
),
Product(
product_id="COUNT003",
@@ -461,8 +467,8 @@ class TestStatsService:
marketplace="CountMarketplace",
shop_name="CountShop",
price="20.00",
currency="EUR"
)
currency="EUR",
),
]
db.add_all(marketplace_products)
db.commit()

View File

@@ -1,5 +1,6 @@
# tests/test_stock.py
import pytest
from models.database_models import Stock
@@ -9,7 +10,7 @@ class TestStockAPI:
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 100
"quantity": 100,
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
@@ -30,7 +31,7 @@ class TestStockAPI:
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 75
"quantity": 75,
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
@@ -49,10 +50,12 @@ class TestStockAPI:
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 25
"quantity": 25,
}
response = client.post("/api/v1/stock/add", headers=auth_headers, json=stock_data)
response = client.post(
"/api/v1/stock/add", headers=auth_headers, json=stock_data
)
assert response.status_code == 200
data = response.json()
@@ -68,10 +71,12 @@ class TestStockAPI:
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 15
"quantity": 15,
}
response = client.post("/api/v1/stock/remove", headers=auth_headers, json=stock_data)
response = client.post(
"/api/v1/stock/remove", headers=auth_headers, json=stock_data
)
assert response.status_code == 200
data = response.json()
@@ -87,10 +92,12 @@ class TestStockAPI:
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 20
"quantity": 20,
}
response = client.post("/api/v1/stock/remove", headers=auth_headers, json=stock_data)
response = client.post(
"/api/v1/stock/remove", headers=auth_headers, json=stock_data
)
assert response.status_code == 400
assert "Insufficient stock" in response.json()["detail"]

View File

@@ -1,9 +1,11 @@
# tests/test_stock_service.py
import pytest
import uuid
import pytest
from app.services.stock_service import StockService
from models.api_models import StockCreate, StockAdd, StockUpdate
from models.database_models import Stock, Product
from models.api_models import StockAdd, StockCreate, StockUpdate
from models.database_models import Product, Stock
class TestStockService:
@@ -39,7 +41,9 @@ class TestStockService:
assert self.service.normalize_gtin("1234567890123") == "1234567890123" # EAN-13
assert self.service.normalize_gtin("123456789012") == "123456789012" # UPC-A
assert self.service.normalize_gtin("12345678") == "12345678" # EAN-8
assert self.service.normalize_gtin("12345678901234") == "12345678901234" # GTIN-14
assert (
self.service.normalize_gtin("12345678901234") == "12345678901234"
) # GTIN-14
# Test with decimal points (should be removed)
assert self.service.normalize_gtin("1234567890123.0") == "1234567890123"
@@ -49,10 +53,14 @@ class TestStockService:
# Test short GTINs being padded
assert self.service.normalize_gtin("123") == "0000000000123" # Padded to EAN-13
assert self.service.normalize_gtin("12345") == "0000000012345" # Padded to EAN-13
assert (
self.service.normalize_gtin("12345") == "0000000012345"
) # Padded to EAN-13
# Test long GTINs being truncated
assert self.service.normalize_gtin("123456789012345") == "3456789012345" # Truncated to 13
assert (
self.service.normalize_gtin("123456789012345") == "3456789012345"
) # Truncated to 13
def test_normalize_gtin_edge_cases(self):
"""Test GTIN normalization edge cases"""
@@ -61,17 +69,21 @@ class TestStockService:
assert self.service.normalize_gtin(123) == "0000000000123"
# Test mixed valid/invalid characters
assert self.service.normalize_gtin("123-456-789-012") == "123456789012" # Dashes removed
assert self.service.normalize_gtin("123 456 789 012") == "123456789012" # Spaces removed
assert self.service.normalize_gtin("ABC123456789012DEF") == "123456789012" # Letters removed
assert (
self.service.normalize_gtin("123-456-789-012") == "123456789012"
) # Dashes removed
assert (
self.service.normalize_gtin("123 456 789 012") == "123456789012"
) # Spaces removed
assert (
self.service.normalize_gtin("ABC123456789012DEF") == "123456789012"
) # Letters removed
def test_set_stock_new_entry(self, db):
"""Test setting stock for a new GTIN/location combination"""
unique_id = str(uuid.uuid4())[:8]
stock_data = StockCreate(
gtin="1234567890123",
location=f"WAREHOUSE_A_{unique_id}",
quantity=100
gtin="1234567890123", location=f"WAREHOUSE_A_{unique_id}", quantity=100
)
result = self.service.set_stock(db, stock_data)
@@ -85,7 +97,7 @@ class TestStockService:
stock_data = StockCreate(
gtin=test_stock.gtin,
location=test_stock.location, # Use exact same location as test_stock
quantity=200
quantity=200,
)
result = self.service.set_stock(db, stock_data)
@@ -98,9 +110,7 @@ class TestStockService:
def test_set_stock_invalid_gtin(self, db):
"""Test setting stock with invalid GTIN"""
stock_data = StockCreate(
gtin="invalid_gtin",
location="WAREHOUSE_A",
quantity=100
gtin="invalid_gtin", location="WAREHOUSE_A", quantity=100
)
with pytest.raises(ValueError, match="Invalid GTIN format"):
@@ -110,9 +120,7 @@ class TestStockService:
"""Test adding stock for a new GTIN/location combination"""
unique_id = str(uuid.uuid4())[:8]
stock_data = StockAdd(
gtin="1234567890123",
location=f"WAREHOUSE_B_{unique_id}",
quantity=50
gtin="1234567890123", location=f"WAREHOUSE_B_{unique_id}", quantity=50
)
result = self.service.add_stock(db, stock_data)
@@ -127,7 +135,7 @@ class TestStockService:
stock_data = StockAdd(
gtin=test_stock.gtin,
location=test_stock.location, # Use exact same location as test_stock
quantity=25
quantity=25,
)
result = self.service.add_stock(db, stock_data)
@@ -138,11 +146,7 @@ class TestStockService:
def test_add_stock_invalid_gtin(self, db):
"""Test adding stock with invalid GTIN"""
stock_data = StockAdd(
gtin="invalid_gtin",
location="WAREHOUSE_A",
quantity=50
)
stock_data = StockAdd(gtin="invalid_gtin", location="WAREHOUSE_A", quantity=50)
with pytest.raises(ValueError, match="Invalid GTIN format"):
self.service.add_stock(db, stock_data)
@@ -150,12 +154,14 @@ class TestStockService:
def test_remove_stock_success(self, db, test_stock):
"""Test removing stock successfully"""
original_quantity = test_stock.quantity
remove_quantity = min(10, original_quantity) # Ensure we don't remove more than available
remove_quantity = min(
10, original_quantity
) # Ensure we don't remove more than available
stock_data = StockAdd(
gtin=test_stock.gtin,
location=test_stock.location, # Use exact same location as test_stock
quantity=remove_quantity
quantity=remove_quantity,
)
result = self.service.remove_stock(db, stock_data)
@@ -169,20 +175,20 @@ class TestStockService:
stock_data = StockAdd(
gtin=test_stock.gtin,
location=test_stock.location, # Use exact same location as test_stock
quantity=test_stock.quantity + 10 # More than available
quantity=test_stock.quantity + 10, # More than available
)
# Fix: Use more flexible regex pattern
with pytest.raises(ValueError, match="Insufficient stock|Not enough stock|Cannot remove"):
with pytest.raises(
ValueError, match="Insufficient stock|Not enough stock|Cannot remove"
):
self.service.remove_stock(db, stock_data)
def test_remove_stock_nonexistent_entry(self, db):
"""Test removing stock from non-existent GTIN/location"""
unique_id = str(uuid.uuid4())[:8]
stock_data = StockAdd(
gtin="9999999999999",
location=f"NONEXISTENT_{unique_id}",
quantity=10
gtin="9999999999999", location=f"NONEXISTENT_{unique_id}", quantity=10
)
with pytest.raises(ValueError, match="No stock found|Stock not found"):
@@ -190,11 +196,7 @@ class TestStockService:
def test_remove_stock_invalid_gtin(self, db):
"""Test removing stock with invalid GTIN"""
stock_data = StockAdd(
gtin="invalid_gtin",
location="WAREHOUSE_A",
quantity=10
)
stock_data = StockAdd(gtin="invalid_gtin", location="WAREHOUSE_A", quantity=10)
with pytest.raises(ValueError, match="Invalid GTIN format"):
self.service.remove_stock(db, stock_data)
@@ -218,14 +220,10 @@ class TestStockService:
# Create multiple stock entries for the same GTIN with unique locations
stock1 = Stock(
gtin=unique_gtin,
location=f"WAREHOUSE_A_{unique_id}",
quantity=50
gtin=unique_gtin, location=f"WAREHOUSE_A_{unique_id}", quantity=50
)
stock2 = Stock(
gtin=unique_gtin,
location=f"WAREHOUSE_B_{unique_id}",
quantity=30
gtin=unique_gtin, location=f"WAREHOUSE_B_{unique_id}", quantity=30
)
db.add(stock1)
@@ -275,7 +273,9 @@ class TestStockService:
assert len(result) >= 1
# Fix: Handle case sensitivity in comparison
assert all(stock.location.upper() == test_stock.location.upper() for stock in result)
assert all(
stock.location.upper() == test_stock.location.upper() for stock in result
)
def test_get_all_stock_with_gtin_filter(self, db, test_stock):
"""Test getting all stock with GTIN filter"""
@@ -293,14 +293,16 @@ class TestStockService:
stock = Stock(
gtin=f"1234567890{i:03d}", # Creates valid 13-digit GTINs: 1234567890000, 1234567890001, etc.
location=f"WAREHOUSE_{unique_prefix}_{i}",
quantity=10
quantity=10,
)
db.add(stock)
db.commit()
result = self.service.get_all_stock(db, skip=2, limit=2)
assert len(result) <= 2 # Should be at most 2, might be less if other records exist
assert (
len(result) <= 2
) # Should be at most 2, might be less if other records exist
def test_update_stock_success(self, db, test_stock):
"""Test updating stock quantity"""
@@ -359,7 +361,7 @@ def test_product_with_stock(db, test_stock):
gtin=test_stock.gtin,
price="29.99",
brand="TestBrand",
marketplace="Letzshop"
marketplace="Letzshop",
)
db.add(product)
db.commit()

View File

@@ -1,5 +1,6 @@
# tests/test_utils.py (Enhanced version of your existing file)
import pytest
from utils.data_processing import GTINProcessor, PriceProcessor