Refactoring code for modular approach

This commit is contained in:
2025-09-09 21:27:58 +02:00
parent 9a5d70e825
commit 71153a1ff5
55 changed files with 3928 additions and 1352 deletions

57
tests/Makefile Normal file
View File

@@ -0,0 +1,57 @@
# Makefile for running tests
# tests/Makefile
.PHONY: test test-unit test-integration test-coverage test-fast test-slow
# Run all tests
test:
pytest tests/ -v
# Run only unit tests
test-unit:
pytest tests/ -v -m unit
# Run only integration tests
test-integration:
pytest tests/ -v -m integration
# Run tests with coverage report
test-coverage:
pytest tests/ --cov=app --cov=models --cov=utils --cov=middleware --cov-report=html --cov-report=term-missing
# Run fast tests only (exclude slow ones)
test-fast:
pytest tests/ -v -m "not slow"
# Run slow tests only
test-slow:
pytest tests/ -v -m slow
# Run specific test file
test-auth:
pytest tests/test_auth.py -v
test-products:
pytest tests/test_products.py -v
test-stock:
pytest tests/test_stock.py -v
# Clean up test artifacts
clean:
rm -rf htmlcov/
rm -rf .pytest_cache/
rm -rf .coverage
find . -type d -name "__pycache__" -exec rm -rf {} +
find . -name "*.pyc" -delete
# Install test dependencies
install-test-deps:
pip install -r tests/requirements_test.txtvalidate_csv_headers(valid_df) == True
# Invalid headers (missing required fields)
invalid_df = pd.DataFrame({
"id": ["TEST001"], # Wrong column name
"name": ["Test"]
})
assert self.processor._

2
tests/__init__.py Normal file
View File

@@ -0,0 +1,2 @@
# tests/__init__.py
# This file makes the tests directory a Python package

194
tests/conftest.py Normal file
View File

@@ -0,0 +1,194 @@
# tests/conftest.py
import pytest
import tempfile
import os
from fastapi.testclient import TestClient
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.pool import StaticPool
from main import app
from app.core.database import get_db, Base
from models.database_models import User, Product, Stock, Shop
from middleware.auth import AuthManager
# Use in-memory SQLite database for tests
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
@pytest.fixture(scope="session")
def engine():
"""Create test database engine"""
return create_engine(
SQLALCHEMY_TEST_DATABASE_URL,
connect_args={"check_same_thread": False},
poolclass=StaticPool,
echo=False # Set to True for SQL debugging
)
@pytest.fixture(scope="session")
def testing_session_local(engine):
"""Create session factory for tests"""
return sessionmaker(autocommit=False, autoflush=False, bind=engine)
@pytest.fixture(scope="function")
def db(engine, testing_session_local):
"""Create a fresh database for each test"""
# Create all tables
Base.metadata.create_all(bind=engine)
# Create session
db = testing_session_local()
# Override the dependency
def override_get_db():
try:
yield db
finally:
pass # Don't close here, we'll close in cleanup
app.dependency_overrides[get_db] = override_get_db
try:
yield db
finally:
db.rollback() # Rollback any uncommitted changes
db.close()
# Clean up the dependency override
if get_db in app.dependency_overrides:
del app.dependency_overrides[get_db]
# Drop all tables for next test
Base.metadata.drop_all(bind=engine)
@pytest.fixture(scope="function")
def client(db):
"""Create a test client with database dependency override"""
return TestClient(app)
@pytest.fixture(scope="session")
def auth_manager():
"""Create auth manager instance (session scope since it's stateless)"""
return AuthManager()
@pytest.fixture
def test_user(db, auth_manager):
"""Create a test user"""
hashed_password = auth_manager.hash_password("testpass123")
user = User(
email="test@example.com",
username="testuser",
hashed_password=hashed_password,
role="user",
is_active=True
)
db.add(user)
db.commit()
db.refresh(user)
return user
@pytest.fixture
def test_admin(db, auth_manager):
"""Create a test admin user"""
hashed_password = auth_manager.hash_password("adminpass123")
admin = User(
email="admin@example.com",
username="admin",
hashed_password=hashed_password,
role="admin",
is_active=True
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def auth_headers(client, test_user):
"""Get authentication headers for test user"""
response = client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "testpass123"
})
assert response.status_code == 200, f"Login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def admin_headers(client, test_admin):
"""Get authentication headers for admin user"""
response = client.post("/api/v1/auth/login", json={
"username": "admin",
"password": "adminpass123"
})
assert response.status_code == 200, f"Admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def test_product(db):
"""Create a test product"""
product = Product(
product_id="TEST001",
title="Test Product",
description="A test product",
price="10.99",
currency="EUR",
brand="TestBrand",
gtin="1234567890123",
availability="in stock",
marketplace="Letzshop",
shop_name="TestShop"
)
db.add(product)
db.commit()
db.refresh(product)
return product
@pytest.fixture
def test_shop(db, test_user):
"""Create a test shop"""
shop = Shop(
shop_code="TESTSHOP",
shop_name="Test Shop",
owner_id=test_user.id,
is_active=True,
is_verified=True
)
db.add(shop)
db.commit()
db.refresh(shop)
return shop
@pytest.fixture
def test_stock(db, test_product, test_shop):
"""Create test stock entry"""
stock = Stock(
product_id=test_product.product_id,
shop_code=test_shop.shop_code,
quantity=10,
reserved_quantity=0
)
db.add(stock)
db.commit()
db.refresh(stock)
return stock
# Cleanup fixture to ensure clean state
@pytest.fixture(autouse=True)
def cleanup():
"""Automatically clean up after each test"""
yield
# Clear any remaining dependency overrides
app.dependency_overrides.clear()

21
tests/pytest.ini Normal file
View File

@@ -0,0 +1,21 @@
# tests/pytest.ini
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
--color=yes
markers =
slow: marks tests as slow (deselect with '-m "not slow"')
integration: marks tests as integration tests
unit: marks tests as unit tests
auth: marks tests related to authentication
products: marks tests related to products
stock: marks tests related to stock management
shops: marks tests related to shop management
admin: marks tests related to admin functionality

View File

@@ -0,0 +1,8 @@
# tests/requirements_test.txt
# Testing dependencies
pytest>=7.4.0
pytest-cov>=4.1.0
pytest-asyncio>=0.21.0
pytest-mock>=3.11.0
httpx>=0.24.0
faker>=19.0.0

34
tests/test_admin.py Normal file
View File

@@ -0,0 +1,34 @@
# tests/test_admin.py
import pytest
class TestAdminAPI:
def test_get_all_users_admin(self, client, admin_headers, test_user):
"""Test admin getting all users"""
response = client.get("/api/v1/admin/users", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert len(data) >= 2 # test_user + admin user
def test_get_all_users_non_admin(self, client, auth_headers):
"""Test non-admin trying to access admin endpoint"""
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert response.status_code == 403
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
def test_toggle_user_status_admin(self, client, admin_headers, test_user):
"""Test admin toggling user status"""
response = client.put(f"/api/v1/admin/users/{test_user.id}/status", headers=admin_headers)
assert response.status_code == 200
assert "deactivated" in response.json()["message"] or "activated" in response.json()["message"]
def test_get_all_shops_admin(self, client, admin_headers, test_shop):
"""Test admin getting all shops"""
response = client.get("/api/v1/admin/shops", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1

119
tests/test_auth.py Normal file
View File

@@ -0,0 +1,119 @@
# tests/test_auth.py
import pytest
from fastapi import HTTPException
class TestAuthenticationAPI:
def test_register_user_success(self, client, db):
"""Test successful user registration"""
response = client.post("/api/v1/auth/register", json={
"email": "newuser@example.com",
"username": "newuser",
"password": "securepass123"
})
assert response.status_code == 200
data = response.json()
assert data["email"] == "newuser@example.com"
assert data["username"] == "newuser"
assert data["role"] == "user"
assert data["is_active"] == True
assert "hashed_password" not in data
def test_register_user_duplicate_email(self, client, test_user):
"""Test registration with duplicate email"""
response = client.post("/api/v1/auth/register", json={
"email": "test@example.com", # Same as test_user
"username": "newuser",
"password": "securepass123"
})
assert response.status_code == 400
assert "Email already registered" in response.json()["detail"]
def test_register_user_duplicate_username(self, client, test_user):
"""Test registration with duplicate username"""
response = client.post("/api/v1/auth/register", json={
"email": "new@example.com",
"username": "testuser", # Same as test_user
"password": "securepass123"
})
assert response.status_code == 400
assert "Username already taken" in response.json()["detail"]
def test_login_success(self, client, test_user):
"""Test successful login"""
response = client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "testpass123"
})
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert data["token_type"] == "bearer"
assert "expires_in" in data
assert data["user"]["username"] == "testuser"
def test_login_wrong_password(self, client, test_user):
"""Test login with wrong password"""
response = client.post("/api/v1/auth/login", json={
"username": "testuser",
"password": "wrongpassword"
})
assert response.status_code == 401
assert "Incorrect username or password" in response.json()["detail"]
def test_login_nonexistent_user(self, client):
"""Test login with nonexistent user"""
response = client.post("/api/v1/auth/login", json={
"username": "nonexistent",
"password": "password123"
})
assert response.status_code == 401
def test_get_current_user_info(self, client, auth_headers):
"""Test getting current user info"""
response = client.get("/api/v1/auth/me", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["username"] == "testuser"
assert data["email"] == "test@example.com"
def test_get_current_user_no_auth(self, client):
"""Test getting current user without authentication"""
response = client.get("/api/v1/auth/me")
assert response.status_code == 403 # No authorization header
class TestAuthManager:
def test_hash_password(self, auth_manager):
"""Test password hashing"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert hashed != password
assert len(hashed) > 20 # bcrypt hashes are long
def test_verify_password(self, auth_manager):
"""Test password verification"""
password = "testpassword123"
hashed = auth_manager.hash_password(password)
assert auth_manager.verify_password(password, hashed) == True
assert auth_manager.verify_password("wrongpassword", hashed) == False
def test_create_access_token(self, auth_manager, test_user):
"""Test JWT token creation"""
token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
assert "expires_in" in token_data
assert isinstance(token_data["expires_in"], int)

View File

@@ -0,0 +1,83 @@
# tests/test_background_tasks.py
import pytest
from unittest.mock import patch, AsyncMock
from app.tasks.background_tasks import process_marketplace_import
from models.database_models import MarketplaceImportJob
class TestBackgroundTasks:
@pytest.mark.asyncio
async def test_marketplace_import_success(self, db):
"""Test successful marketplace import background task"""
# Create import job
job = MarketplaceImportJob(
status="pending",
source_url="http://example.com/test.csv",
marketplace="TestMarket",
shop_code="TESTSHOP",
user_id=1
)
db.add(job)
db.commit()
db.refresh(job)
# Mock CSV processor
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor:
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(return_value={
"imported": 10,
"updated": 5,
"total_processed": 15,
"errors": 0
})
# Run background task
await process_marketplace_import(
job.id,
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
)
# Verify job was updated
db.refresh(job)
assert job.status == "completed"
assert job.imported_count == 10
assert job.updated_count == 5
@pytest.mark.asyncio
async def test_marketplace_import_failure(self, db):
"""Test marketplace import failure handling"""
# Create import job
job = MarketplaceImportJob(
status="pending",
source_url="http://example.com/test.csv",
marketplace="TestMarket",
shop_code="TESTSHOP",
user_id=1
)
db.add(job)
db.commit()
db.refresh(job)
# Mock CSV processor to raise exception
with patch('app.tasks.background_tasks.CSVProcessor') as mock_processor:
mock_instance = mock_processor.return_value
mock_instance.process_marketplace_csv_from_url = AsyncMock(
side_effect=Exception("Import failed")
)
# Run background task
await process_marketplace_import(
job.id,
"http://example.com/test.csv",
"TestMarket",
"TESTSHOP",
1000
)
# Verify job failure was recorded
db.refresh(job)
assert job.status == "failed"
assert "Import failed" in job.error_message

View File

@@ -0,0 +1,90 @@
# tests/test_csv_processor.py
import pytest
from unittest.mock import Mock, patch, AsyncMock
from io import StringIO
import pandas as pd
from utils.csv_processor import CSVProcessor
class TestCSVProcessor:
def setup_method(self):
self.processor = CSVProcessor()
@patch('requests.get')
def test_download_csv_success(self, mock_get):
"""Test successful CSV download"""
# Mock successful HTTP response
mock_response = Mock()
mock_response.status_code = 200
mock_response.text = "product_id,title,price\nTEST001,Test Product,10.99"
mock_get.return_value = mock_response
csv_content = self.processor._download_csv("http://example.com/test.csv")
assert "product_id,title,price" in csv_content
assert "TEST001,Test Product,10.99" in csv_content
@patch('requests.get')
def test_download_csv_failure(self, mock_get):
"""Test CSV download failure"""
# Mock failed HTTP response
mock_response = Mock()
mock_response.status_code = 404
mock_get.return_value = mock_response
with pytest.raises(Exception):
self.processor._download_csv("http://example.com/nonexistent.csv")
def test_parse_csv_content(self):
"""Test CSV content parsing"""
csv_content = """product_id,title,price,marketplace
TEST001,Test Product 1,10.99,TestMarket
TEST002,Test Product 2,15.99,TestMarket"""
df = self.processor._parse_csv_content(csv_content)
assert len(df) == 2
assert "product_id" in df.columns
assert df.iloc[0]["product_id"] == "TEST001"
assert df.iloc[1]["price"] == "15.99"
def test_validate_csv_headers(self):
"""Test CSV header validation"""
# Valid headers
valid_df = pd.DataFrame({
"product_id": ["TEST001"],
"title": ["Test"],
"price": ["10.99"]
})
assert self.processor._validate_csv_headers(invalid_df) == False
@pytest.mark.asyncio
async def test_process_marketplace_csv_from_url(self, db):
"""Test complete marketplace CSV processing"""
with patch.object(self.processor, '_download_csv') as mock_download, \
patch.object(self.processor, '_parse_csv_content') as mock_parse, \
patch.object(self.processor, '_validate_csv_headers') as mock_validate:
# Mock successful download and parsing
mock_download.return_value = "csv_content"
mock_df = pd.DataFrame({
"product_id": ["TEST001", "TEST002"],
"title": ["Product 1", "Product 2"],
"price": ["10.99", "15.99"],
"marketplace": ["TestMarket", "TestMarket"],
"shop_name": ["TestShop", "TestShop"]
})
mock_parse.return_value = mock_df
mock_validate.return_value = True
result = await self.processor.process_marketplace_csv_from_url(
"http://example.com/test.csv",
"TestMarket",
"TestShop",
1000,
db
)
assert "imported" in result
assert "updated" in result
assert "total_processed" in result

View File

@@ -0,0 +1,46 @@
# tests/test_data_validation.py
import pytest
from utils.data_processing import GTINProcessor, PriceProcessor
class TestDataValidation:
def test_gtin_normalization_edge_cases(self):
"""Test GTIN normalization with edge cases"""
processor = GTINProcessor()
# Test with leading zeros
assert processor.normalize("000123456789") == "000123456789"
# Test with spaces
assert processor.normalize("123 456 789 012") == "123456789012"
# Test with dashes
assert processor.normalize("123-456-789-012") == "123456789012"
# Test very long numbers
long_number = "1234567890123456789"
normalized = processor.normalize(long_number)
assert len(normalized) <= 14 # Should be truncated
def test_price_parsing_edge_cases(self):
"""Test price parsing with edge cases"""
processor = PriceProcessor()
# Test with multiple decimal places
price, currency = processor.parse_price_currency("12.999 EUR")
assert price == "12.999"
# Test with no currency
price, currency = processor.parse_price_currency("15.50")
assert price == "15.50"
# Test with unusual formatting
price, currency = processor.parse_price_currency("EUR 25,50")
assert currency == "EUR"
assert price == "25.50" # Comma should be converted to dot
def test_input_sanitization(self):
"""Test input sanitization"""
# These tests would verify that inputs are properly sanitized
# to prevent SQL injection, XSS, etc.
pass # Implementation would depend on your sanitization logic

98
tests/test_database.py Normal file
View File

@@ -0,0 +1,98 @@
# tests/test_database.py
import pytest
from sqlalchemy import text
from models.database_models import User, Product, Stock, Shop
class TestDatabaseModels:
def test_user_model(self, db):
"""Test User model creation and relationships"""
user = User(
email="db_test@example.com",
username="dbtest",
hashed_password="hashed_password_123",
role="user",
is_active=True
)
db.add(user)
db.commit()
db.refresh(user)
assert user.id is not None
assert user.email == "db_test@example.com"
assert user.created_at is not None
assert user.updated_at is not None
def test_product_model(self, db):
"""Test Product model creation"""
product = Product(
product_id="DB_TEST_001",
title="Database Test Product",
description="Testing product model",
price="25.99",
currency="USD",
brand="DBTest",
gtin="1234567890123",
availability="in stock",
marketplace="TestDB",
shop_name="DBTestShop"
)
db.add(product)
db.commit()
db.refresh(product)
assert product.id is not None
assert product.product_id == "DB_TEST_001"
assert product.created_at is not None
def test_stock_model(self, db):
"""Test Stock model creation"""
stock = Stock(
gtin="1234567890123",
location="DB_WAREHOUSE",
quantity=150
)
db.add(stock)
db.commit()
db.refresh(stock)
assert stock.id is not None
assert stock.gtin == "1234567890123"
assert stock.location == "DB_WAREHOUSE"
assert stock.quantity == 150
def test_shop_model_with_owner(self, db, test_user):
"""Test Shop model with owner relationship"""
shop = Shop(
shop_code="DBTEST",
shop_name="Database Test Shop",
description="Testing shop model",
owner_id=test_user.id,
is_active=True,
is_verified=False
)
db.add(shop)
db.commit()
db.refresh(shop)
assert shop.id is not None
assert shop.shop_code == "DBTEST"
assert shop.owner_id == test_user.id
assert shop.owner.username == test_user.username
def test_database_constraints(self, db):
"""Test database constraints and unique indexes"""
# Test unique product_id constraint
product1 = Product(product_id="UNIQUE_001", title="Product 1")
db.add(product1)
db.commit()
# This should raise an integrity error
with pytest.raises(Exception): # Could be IntegrityError or similar
product2 = Product(product_id="UNIQUE_001", title="Product 2")
db.add(product2)
db.commit()

View File

@@ -0,0 +1,45 @@
# tests/test_error_handling.py
import pytest
class TestErrorHandling:
def test_invalid_json(self, client, auth_headers):
"""Test handling of invalid JSON"""
response = client.post("/api/v1/products",
headers=auth_headers,
data="invalid json")
assert response.status_code == 422 # Validation error
def test_missing_required_fields(self, client, auth_headers):
"""Test handling of missing required fields"""
response = client.post("/api/v1/products",
headers=auth_headers,
json={"title": "Test"}) # Missing product_id
assert response.status_code == 422
def test_invalid_authentication(self, client):
"""Test handling of invalid authentication"""
response = client.get("/api/v1/products",
headers={"Authorization": "Bearer invalid_token"})
assert response.status_code == 403
def test_nonexistent_resource(self, client, auth_headers):
"""Test handling of nonexistent resource access"""
response = client.get("/api/v1/products/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
response = client.get("/api/v1/shops/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
def test_duplicate_resource_creation(self, client, auth_headers, test_product):
"""Test handling of duplicate resource creation"""
product_data = {
"product_id": test_product.product_id, # Duplicate ID
"title": "Another Product"
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
assert response.status_code == 400

65
tests/test_export.py Normal file
View File

@@ -0,0 +1,65 @@
# tests/test_export.py
import pytest
import csv
from io import StringIO
class TestExportFunctionality:
def test_csv_export_basic(self, client, auth_headers, test_product):
"""Test basic CSV export functionality"""
response = client.get("/api/v1/export-csv", headers=auth_headers)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv; charset=utf-8"
# Parse CSV content
csv_content = response.content.decode('utf-8')
csv_reader = csv.reader(StringIO(csv_content))
# Check header row
header = next(csv_reader)
expected_fields = ["product_id", "title", "description", "price", "marketplace"]
for field in expected_fields:
assert field in header
def test_csv_export_with_marketplace_filter(self, client, auth_headers, db):
"""Test CSV export with marketplace filtering"""
# Create products in different marketplaces
products = [
Product(product_id="EXP1", title="Product 1", marketplace="Amazon"),
Product(product_id="EXP2", title="Product 2", marketplace="eBay"),
]
db.add_all(products)
db.commit()
response = client.get("/api/v1/export-csv?marketplace=Amazon", headers=auth_headers)
assert response.status_code == 200
csv_content = response.content.decode('utf-8')
assert "EXP1" in csv_content
assert "EXP2" not in csv_content # Should be filtered out
def test_csv_export_performance(self, client, auth_headers, db):
"""Test CSV export performance with many products"""
# Create many products
products = []
for i in range(1000):
product = Product(
product_id=f"PERF{i:04d}",
title=f"Performance Product {i}",
marketplace="Performance"
)
products.append(product)
db.add_all(products)
db.commit()
import time
start_time = time.time()
response = client.get("/api/v1/export-csv", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert end_time - start_time < 10.0 # Should complete within 10 seconds

85
tests/test_filtering.py Normal file
View File

@@ -0,0 +1,85 @@
# tests/test_filtering.py
import pytest
from models.database_models import Product
class TestFiltering:
def test_product_brand_filter(self, client, auth_headers, db):
"""Test filtering products by brand"""
# Create products with different brands
products = [
Product(product_id="BRAND1", title="Product 1", brand="BrandA"),
Product(product_id="BRAND2", title="Product 2", brand="BrandB"),
Product(product_id="BRAND3", title="Product 3", brand="BrandA"),
]
db.add_all(products)
db.commit()
# Filter by BrandA
response = client.get("/api/v1/products?brand=BrandA", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
# Filter by BrandB
response = client.get("/api/v1/products?brand=BrandB", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1
def test_product_marketplace_filter(self, client, auth_headers, db):
"""Test filtering products by marketplace"""
products = [
Product(product_id="MKT1", title="Product 1", marketplace="Amazon"),
Product(product_id="MKT2", title="Product 2", marketplace="eBay"),
Product(product_id="MKT3", title="Product 3", marketplace="Amazon"),
]
db.add_all(products)
db.commit()
response = client.get("/api/v1/products?marketplace=Amazon", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2
def test_product_search_filter(self, client, auth_headers, db):
"""Test searching products by text"""
products = [
Product(product_id="SEARCH1", title="Apple iPhone", description="Smartphone"),
Product(product_id="SEARCH2", title="Samsung Galaxy", description="Android phone"),
Product(product_id="SEARCH3", title="iPad Tablet", description="Apple tablet"),
]
db.add_all(products)
db.commit()
# Search for "Apple"
response = client.get("/api/v1/products?search=Apple", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2 # iPhone and iPad
# Search for "phone"
response = client.get("/api/v1/products?search=phone", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 2 # iPhone and Galaxy
def test_combined_filters(self, client, auth_headers, db):
"""Test combining multiple filters"""
products = [
Product(product_id="COMBO1", title="Apple iPhone", brand="Apple", marketplace="Amazon"),
Product(product_id="COMBO2", title="Apple iPad", brand="Apple", marketplace="eBay"),
Product(product_id="COMBO3", title="Samsung Phone", brand="Samsung", marketplace="Amazon"),
]
db.add_all(products)
db.commit()
# Filter by brand AND marketplace
response = client.get("/api/v1/products?brand=Apple&marketplace=Amazon", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] == 1 # Only iPhone matches both

117
tests/test_integration.py Normal file
View File

@@ -0,0 +1,117 @@
# tests/test_integration.py
import pytest
class TestIntegrationFlows:
def test_full_product_workflow(self, client, auth_headers):
"""Test complete product creation and management workflow"""
# 1. Create a product
product_data = {
"product_id": "FLOW001",
"title": "Integration Test Product",
"description": "Testing full workflow",
"price": "29.99",
"brand": "FlowBrand",
"gtin": "1111222233334",
"availability": "in stock",
"marketplace": "TestFlow"
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
assert response.status_code == 200
product = response.json()
# 2. Add stock for the product
stock_data = {
"gtin": product["gtin"],
"location": "MAIN_WAREHOUSE",
"quantity": 50
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
assert response.status_code == 200
# 3. Get product with stock info
response = client.get(f"/api/v1/products/{product['product_id']}", headers=auth_headers)
assert response.status_code == 200
product_detail = response.json()
assert product_detail["stock_info"]["total_quantity"] == 50
# 4. Update product
update_data = {"title": "Updated Integration Test Product"}
response = client.put(f"/api/v1/products/{product['product_id']}",
headers=auth_headers, json=update_data)
assert response.status_code == 200
# 5. Search for product
response = client.get("/api/v1/products?search=Updated Integration", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total"] == 1
def test_shop_product_workflow(self, client, auth_headers):
"""Test shop creation and product management workflow"""
# 1. Create a shop
shop_data = {
"shop_code": "FLOWSHOP",
"shop_name": "Integration Flow Shop",
"description": "Test shop for integration"
}
response = client.post("/api/v1/shops", headers=auth_headers, json=shop_data)
assert response.status_code == 200
shop = response.json()
# 2. Create a product
product_data = {
"product_id": "SHOPFLOW001",
"title": "Shop Flow Product",
"price": "15.99",
"marketplace": "ShopFlow"
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
assert response.status_code == 200
product = response.json()
# 3. Add product to shop (if endpoint exists)
# This would test the shop-product association
# 4. Get shop details
response = client.get(f"/api/v1/shops/{shop['shop_code']}", headers=auth_headers)
assert response.status_code == 200
def test_stock_operations_workflow(self, client, auth_headers):
"""Test complete stock management workflow"""
gtin = "9999888877776"
location = "TEST_WAREHOUSE"
# 1. Set initial stock
response = client.post("/api/v1/stock", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 100
})
assert response.status_code == 200
# 2. Add more stock
response = client.post("/api/v1/stock/add", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 25
})
assert response.status_code == 200
assert response.json()["quantity"] == 125
# 3. Remove some stock
response = client.post("/api/v1/stock/remove", headers=auth_headers, json={
"gtin": gtin,
"location": location,
"quantity": 30
})
assert response.status_code == 200
assert response.json()["quantity"] == 95
# 4. Check total stock
response = client.get(f"/api/v1/stock/{gtin}/total", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total_quantity"] == 95

52
tests/test_marketplace.py Normal file
View File

@@ -0,0 +1,52 @@
# tests/test_marketplace.py
import pytest
from unittest.mock import patch, AsyncMock
class TestMarketplaceAPI:
@patch('utils.csv_processor.CSVProcessor.process_marketplace_csv_from_url')
def test_import_from_marketplace(self, mock_process, client, auth_headers, test_shop):
"""Test marketplace import endpoint"""
mock_process.return_value = AsyncMock()
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code
}
response = client.post("/api/v1/marketplace/import-from-marketplace",
headers=auth_headers, json=import_data)
assert response.status_code == 200
data = response.json()
assert data["status"] == "pending"
assert data["marketplace"] == "TestMarket"
assert "job_id" in data
def test_import_from_marketplace_invalid_shop(self, client, auth_headers):
"""Test marketplace import with invalid shop"""
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": "NONEXISTENT"
}
response = client.post("/api/v1/marketplace/import-from-marketplace",
headers=auth_headers, json=import_data)
assert response.status_code == 404
assert "Shop not found" in response.json()["detail"]
def test_get_marketplace_import_jobs(self, client, auth_headers):
"""Test getting marketplace import jobs"""
response = client.get("/api/v1/marketplace/marketplace-import-jobs", headers=auth_headers)
assert response.status_code == 200
assert isinstance(response.json(), list)
def test_marketplace_requires_auth(self, client):
"""Test that marketplace endpoints require authentication"""
response = client.get("/api/v1/marketplace/marketplace-import-jobs")
assert response.status_code == 403

63
tests/test_middleware.py Normal file
View File

@@ -0,0 +1,63 @@
# tests/test_middleware.py
import pytest
from unittest.mock import Mock, patch
from middleware.rate_limiter import RateLimiter
from middleware.auth import AuthManager
class TestRateLimiter:
def test_rate_limiter_allows_requests(self):
"""Test rate limiter allows requests within limit"""
limiter = RateLimiter()
client_id = "test_client"
# Should allow first request
assert limiter.allow_request(client_id, max_requests=10, window_seconds=3600) == True
# Should allow subsequent requests within limit
for _ in range(5):
assert limiter.allow_request(client_id, max_requests=10, window_seconds=3600) == True
def test_rate_limiter_blocks_excess_requests(self):
"""Test rate limiter blocks requests exceeding limit"""
limiter = RateLimiter()
client_id = "test_client_blocked"
max_requests = 3
# Use up the allowed requests
for _ in range(max_requests):
assert limiter.allow_request(client_id, max_requests, 3600) == True
# Next request should be blocked
assert limiter.allow_request(client_id, max_requests, 3600) == False
class TestAuthManager:
def test_password_hashing_and_verification(self):
"""Test password hashing and verification"""
auth_manager = AuthManager()
password = "test_password_123"
# Hash password
hashed = auth_manager.hash_password(password)
# Verify correct password
assert auth_manager.verify_password(password, hashed) == True
# Verify incorrect password
assert auth_manager.verify_password("wrong_password", hashed) == False
def test_jwt_token_creation_and_validation(self, test_user):
"""Test JWT token creation and validation"""
auth_manager = AuthManager()
# Create token
token_data = auth_manager.create_access_token(test_user)
assert "access_token" in token_data
assert token_data["token_type"] == "bearer"
assert isinstance(token_data["expires_in"], int)
# Token should be a string
assert isinstance(token_data["access_token"], str)
assert len(token_data["access_token"]) > 50 # JWT tokens are long

56
tests/test_pagination.py Normal file
View File

@@ -0,0 +1,56 @@
# tests/test_pagination.py
import pytest
from models.database_models import Product
class TestPagination:
def test_product_pagination(self, client, auth_headers, db):
"""Test pagination for product listing"""
# Create multiple products
products = []
for i in range(25):
product = Product(
product_id=f"PAGE{i:03d}",
title=f"Pagination Test Product {i}",
marketplace="PaginationTest"
)
products.append(product)
db.add_all(products)
db.commit()
# Test first page
response = client.get("/api/v1/products?limit=10&skip=0", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["products"]) == 10
assert data["total"] == 25
assert data["skip"] == 0
assert data["limit"] == 10
# Test second page
response = client.get("/api/v1/products?limit=10&skip=10", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["products"]) == 10
assert data["skip"] == 10
# Test last page
response = client.get("/api/v1/products?limit=10&skip=20", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["products"]) == 5 # Only 5 remaining
def test_pagination_boundaries(self, client, auth_headers):
"""Test pagination boundary conditions"""
# Test negative skip
response = client.get("/api/v1/products?skip=-1", headers=auth_headers)
assert response.status_code == 422 # Validation error
# Test zero limit
response = client.get("/api/v1/products?limit=0", headers=auth_headers)
assert response.status_code == 422 # Validation error
# Test excessive limit
response = client.get("/api/v1/products?limit=10000", headers=auth_headers)
assert response.status_code == 422 # Should be limited

56
tests/test_performance.py Normal file
View File

@@ -0,0 +1,56 @@
# tests/test_performance.py
import pytest
import time
class TestPerformance:
def test_product_list_performance(self, client, auth_headers, db):
"""Test performance of product listing with many products"""
# Create multiple products
products = []
for i in range(100):
product = Product(
product_id=f"PERF{i:03d}",
title=f"Performance Test Product {i}",
price=f"{i}.99",
marketplace="Performance"
)
products.append(product)
db.add_all(products)
db.commit()
# Time the request
start_time = time.time()
response = client.get("/api/v1/products?limit=100", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert len(response.json()["products"]) == 100
assert end_time - start_time < 2.0 # Should complete within 2 seconds
def test_search_performance(self, client, auth_headers, db):
"""Test search performance"""
# Create products with searchable content
products = []
for i in range(50):
product = Product(
product_id=f"SEARCH{i:03d}",
title=f"Searchable Product {i}",
description=f"This is a searchable product number {i}",
brand="SearchBrand",
marketplace="SearchMarket"
)
products.append(product)
db.add_all(products)
db.commit()
# Time search request
start_time = time.time()
response = client.get("/api/v1/products?search=Searchable", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200
assert response.json()["total"] == 50
assert end_time - start_time < 1.0 # Search should be fast

122
tests/test_products.py Normal file
View File

@@ -0,0 +1,122 @@
# tests/test_products.py
import pytest
class TestProductsAPI:
def test_get_products_empty(self, client, auth_headers):
"""Test getting products when none exist"""
response = client.get("/api/v1/products", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["products"] == []
assert data["total"] == 0
def test_get_products_with_data(self, client, auth_headers, test_product):
"""Test getting products with data"""
response = client.get("/api/v1/products", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data["products"]) == 1
assert data["total"] == 1
assert data["products"][0]["product_id"] == "TEST001"
def test_get_products_with_filters(self, client, auth_headers, test_product):
"""Test filtering products"""
# Test brand filter
response = client.get("/api/v1/products?brand=TestBrand", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total"] == 1
# Test marketplace filter
response = client.get("/api/v1/products?marketplace=Letzshop", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total"] == 1
# Test search
response = client.get("/api/v1/products?search=Test", headers=auth_headers)
assert response.status_code == 200
assert response.json()["total"] == 1
def test_create_product(self, client, auth_headers):
"""Test creating a new product"""
product_data = {
"product_id": "NEW001",
"title": "New Product",
"description": "A new product",
"price": "15.99",
"brand": "NewBrand",
"gtin": "9876543210987",
"availability": "in stock",
"marketplace": "Amazon"
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
assert response.status_code == 200
data = response.json()
assert data["product_id"] == "NEW001"
assert data["title"] == "New Product"
assert data["marketplace"] == "Amazon"
def test_create_product_duplicate_id(self, client, auth_headers, test_product):
"""Test creating product with duplicate ID"""
product_data = {
"product_id": "TEST001", # Same as test_product
"title": "Another Product",
"price": "20.00"
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
def test_get_product_by_id(self, client, auth_headers, test_product):
"""Test getting specific product"""
response = client.get(f"/api/v1/products/{test_product.product_id}", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["product"]["product_id"] == test_product.product_id
assert data["product"]["title"] == test_product.title
def test_get_nonexistent_product(self, client, auth_headers):
"""Test getting nonexistent product"""
response = client.get("/api/v1/products/NONEXISTENT", headers=auth_headers)
assert response.status_code == 404
def test_update_product(self, client, auth_headers, test_product):
"""Test updating product"""
update_data = {
"title": "Updated Product Title",
"price": "25.99"
}
response = client.put(
f"/api/v1/products/{test_product.product_id}",
headers=auth_headers,
json=update_data
)
assert response.status_code == 200
data = response.json()
assert data["title"] == "Updated Product Title"
assert data["price"] == "25.99"
def test_delete_product(self, client, auth_headers, test_product):
"""Test deleting product"""
response = client.delete(
f"/api/v1/products/{test_product.product_id}",
headers=auth_headers
)
assert response.status_code == 200
assert "deleted successfully" in response.json()["message"]
def test_products_require_auth(self, client):
"""Test that product endpoints require authentication"""
response = client.get("/api/v1/products")
assert response.status_code == 403

61
tests/test_security.py Normal file
View File

@@ -0,0 +1,61 @@
# tests/test_security.py
import pytest
from fastapi import HTTPException
from unittest.mock import patch
class TestSecurity:
def test_protected_endpoint_without_auth(self, client):
"""Test that protected endpoints reject unauthenticated requests"""
protected_endpoints = [
"/api/v1/products",
"/api/v1/stock",
"/api/v1/shops",
"/api/v1/stats",
"/api/v1/admin/users"
]
for endpoint in protected_endpoints:
response = client.get(endpoint)
assert response.status_code == 403
def test_protected_endpoint_with_invalid_token(self, client):
"""Test protected endpoints with invalid token"""
headers = {"Authorization": "Bearer invalid_token_here"}
response = client.get("/api/v1/products", headers=headers)
assert response.status_code == 403
def test_admin_endpoint_requires_admin_role(self, client, auth_headers):
"""Test that admin endpoints require admin role"""
response = client.get("/api/v1/admin/users", headers=auth_headers)
assert response.status_code == 403 # Regular user should be denied
def test_sql_injection_prevention(self, client, auth_headers):
"""Test SQL injection prevention in search parameters"""
# Try SQL injection in search parameter
malicious_search = "'; DROP TABLE products; --"
response = client.get(f"/api/v1/products?search={malicious_search}", headers=auth_headers)
# Should not crash and should return normal response
assert response.status_code == 200
# Database should still be intact (no products dropped)
def test_input_validation(self, client, auth_headers):
"""Test input validation and sanitization"""
# Test XSS attempt in product creation
xss_payload = "<script>alert('xss')</script>"
product_data = {
"product_id": "XSS_TEST",
"title": xss_payload,
"description": xss_payload
}
response = client.post("/api/v1/products", headers=auth_headers, json=product_data)
if response.status_code == 200:
# If creation succeeds, content should be escaped/sanitized
data = response.json()
assert "<script>" not in data["title"]

60
tests/test_services.py Normal file
View File

@@ -0,0 +1,60 @@
# tests/test_services.py
import pytest
from app.services.product_service import ProductService
from models.api_models import ProductCreate
from models.database_models import Product
class TestProductService:
def setup_method(self):
self.service = ProductService()
def test_create_product_with_gtin_validation(self, db):
"""Test product creation with GTIN validation"""
product_data = ProductCreate(
product_id="SVC001",
title="Service Test Product",
gtin="1234567890123",
price="19.99",
marketplace="TestMarket"
)
product = self.service.create_product(db, product_data)
assert product.product_id == "SVC001"
assert product.gtin == "1234567890123"
assert product.marketplace == "TestMarket"
def test_create_product_invalid_gtin(self, db):
"""Test product creation with invalid GTIN"""
product_data = ProductCreate(
product_id="SVC002",
title="Service Test Product",
gtin="invalid_gtin",
price="19.99"
)
with pytest.raises(ValueError, match="Invalid GTIN format"):
self.service.create_product(db, product_data)
def test_get_products_with_filters(self, db, test_product):
"""Test getting products with various filters"""
products, total = self.service.get_products_with_filters(
db,
brand="TestBrand"
)
assert total == 1
assert len(products) == 1
assert products[0].brand == "TestBrand"
def test_get_products_with_search(self, db, test_product):
"""Test getting products with search"""
products, total = self.service.get_products_with_filters(
db,
search="Test Product"
)
assert total == 1
assert len(products) == 1

55
tests/test_shops.py Normal file
View File

@@ -0,0 +1,55 @@
# tests/test_shops.py
import pytest
class TestShopsAPI:
def test_create_shop(self, client, auth_headers):
"""Test creating a new shop"""
shop_data = {
"shop_code": "NEWSHOP",
"shop_name": "New Shop",
"description": "A new test shop"
}
response = client.post("/api/v1/shops", headers=auth_headers, json=shop_data)
assert response.status_code == 200
data = response.json()
assert data["shop_code"] == "NEWSHOP"
assert data["shop_name"] == "New Shop"
assert data["is_active"] == True
def test_create_shop_duplicate_code(self, client, auth_headers, test_shop):
"""Test creating shop with duplicate code"""
shop_data = {
"shop_code": "TESTSHOP", # Same as test_shop
"shop_name": "Another Shop"
}
response = client.post("/api/v1/shops", headers=auth_headers, json=shop_data)
assert response.status_code == 400
assert "already exists" in response.json()["detail"]
def test_get_shops(self, client, auth_headers, test_shop):
"""Test getting shops list"""
response = client.get("/api/v1/shops", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["total"] >= 1
assert len(data["shops"]) >= 1
def test_get_shop_by_code(self, client, auth_headers, test_shop):
"""Test getting specific shop"""
response = client.get(f"/api/v1/shops/{test_shop.shop_code}", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["shop_code"] == test_shop.shop_code
assert data["shop_name"] == test_shop.shop_name
def test_shops_require_auth(self, client):
"""Test that shop endpoints require authentication"""
response = client.get("/api/v1/shops")
assert response.status_code == 403

33
tests/test_stats.py Normal file
View File

@@ -0,0 +1,33 @@
# tests/test_stats.py
import pytest
class TestStatsAPI:
def test_get_basic_stats(self, client, auth_headers, test_product):
"""Test getting basic statistics"""
response = client.get("/api/v1/stats", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "total_products" in data
assert "unique_brands" in data
assert "unique_categories" in data
assert "unique_marketplaces" in data
assert "unique_shops" in data
assert data["total_products"] >= 1
def test_get_marketplace_stats(self, client, auth_headers, test_product):
"""Test getting marketplace statistics"""
response = client.get("/api/v1/stats/marketplace-stats", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
if len(data) > 0:
assert "marketplace" in data[0]
assert "total_products" in data[0]
def test_stats_require_auth(self, client):
"""Test that stats endpoints require authentication"""
response = client.get("/api/v1/stats")
assert response.status_code == 403

147
tests/test_stock.py Normal file
View File

@@ -0,0 +1,147 @@
# tests/test_stock.py
import pytest
from models.database_models import Stock
class TestStockAPI:
def test_set_stock_new(self, client, auth_headers):
"""Test setting stock for new GTIN"""
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 100
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
assert response.status_code == 200
data = response.json()
assert data["gtin"] == "1234567890123"
assert data["location"] == "WAREHOUSE_A"
assert data["quantity"] == 100
def test_set_stock_existing(self, client, auth_headers, db):
"""Test updating existing stock"""
# Create initial stock
stock = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
db.add(stock)
db.commit()
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 75
}
response = client.post("/api/v1/stock", headers=auth_headers, json=stock_data)
assert response.status_code == 200
data = response.json()
assert data["quantity"] == 75 # Should be replaced, not added
def test_add_stock(self, client, auth_headers, db):
"""Test adding to existing stock"""
# Create initial stock
stock = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
db.add(stock)
db.commit()
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 25
}
response = client.post("/api/v1/stock/add", headers=auth_headers, json=stock_data)
assert response.status_code == 200
data = response.json()
assert data["quantity"] == 75 # 50 + 25
def test_remove_stock(self, client, auth_headers, db):
"""Test removing from existing stock"""
# Create initial stock
stock = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
db.add(stock)
db.commit()
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 15
}
response = client.post("/api/v1/stock/remove", headers=auth_headers, json=stock_data)
assert response.status_code == 200
data = response.json()
assert data["quantity"] == 35 # 50 - 15
def test_remove_stock_insufficient(self, client, auth_headers, db):
"""Test removing more stock than available"""
# Create initial stock
stock = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=10)
db.add(stock)
db.commit()
stock_data = {
"gtin": "1234567890123",
"location": "WAREHOUSE_A",
"quantity": 20
}
response = client.post("/api/v1/stock/remove", headers=auth_headers, json=stock_data)
assert response.status_code == 400
assert "Insufficient stock" in response.json()["detail"]
def test_get_stock_by_gtin(self, client, auth_headers, db):
"""Test getting stock summary for GTIN"""
# Create stock in multiple locations
stock1 = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
stock2 = Stock(gtin="1234567890123", location="WAREHOUSE_B", quantity=25)
db.add_all([stock1, stock2])
db.commit()
response = client.get("/api/v1/stock/1234567890123", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["gtin"] == "1234567890123"
assert data["total_quantity"] == 75
assert len(data["locations"]) == 2
def test_get_total_stock(self, client, auth_headers, db):
"""Test getting total stock for GTIN"""
# Create stock in multiple locations
stock1 = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
stock2 = Stock(gtin="1234567890123", location="WAREHOUSE_B", quantity=25)
db.add_all([stock1, stock2])
db.commit()
response = client.get("/api/v1/stock/1234567890123/total", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert data["gtin"] == "1234567890123"
assert data["total_quantity"] == 75
assert data["locations_count"] == 2
def test_get_all_stock(self, client, auth_headers, db):
"""Test getting all stock entries"""
# Create some stock entries
stock1 = Stock(gtin="1234567890123", location="WAREHOUSE_A", quantity=50)
stock2 = Stock(gtin="9876543210987", location="WAREHOUSE_B", quantity=25)
db.add_all([stock1, stock2])
db.commit()
response = client.get("/api/v1/stock", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert len(data) == 2
def test_stock_requires_auth(self, client):
"""Test that stock endpoints require authentication"""
response = client.get("/api/v1/stock")
assert response.status_code == 403

View File

@@ -1,4 +1,4 @@
# tests/test_utils.py
# tests/test_utils.py (Enhanced version of your existing file)
import pytest
from utils.data_processing import GTINProcessor, PriceProcessor
@@ -8,54 +8,111 @@ class TestGTINProcessor:
self.processor = GTINProcessor()
def test_normalize_valid_gtin(self):
"""Test GTIN normalization with valid inputs"""
# Test EAN-13
assert self.processor.normalize("1234567890123") == "1234567890123"
# Test UPC-A
# Test UPC-A (12 digits)
assert self.processor.normalize("123456789012") == "123456789012"
# Test with decimal
# Test with decimal point
assert self.processor.normalize("123456789012.0") == "123456789012"
# Test EAN-8
assert self.processor.normalize("12345678") == "12345678"
def test_normalize_invalid_gtin(self):
"""Test GTIN normalization with invalid inputs"""
assert self.processor.normalize("") is None
assert self.processor.normalize(None) is None
assert self.processor.normalize("abc") is None
assert self.processor.normalize("123") == "000000000123" # Padded to 12 digits
# Test short number (gets padded)
assert self.processor.normalize("123") == "000000000123"
def test_normalize_gtin_with_formatting(self):
"""Test GTIN normalization with various formatting"""
# Test with spaces
assert self.processor.normalize("123 456 789 012") == "123456789012"
# Test with dashes
assert self.processor.normalize("123-456-789-012") == "123456789012"
# Test with mixed formatting
assert self.processor.normalize("123 456-789 012") == "123456789012"
def test_validate_gtin(self):
"""Test GTIN validation"""
assert self.processor.validate("1234567890123") is True
assert self.processor.validate("123456789012") is True
assert self.processor.validate("12345678") is True
assert self.processor.validate("123") is False
assert self.processor.validate("") is False
assert self.processor.validate(None) is False
def test_gtin_checksum_validation(self):
"""Test GTIN checksum validation if implemented"""
# This test would verify checksum calculation if your GTINProcessor implements it
# For now, we'll test the structure validation
assert self.processor.validate("1234567890123") is True
assert self.processor.validate("12345678901234") is True # 14 digits
assert self.processor.validate("123456789012345") is False # 15 digits
class TestPriceProcessor:
def setup_method(self):
self.processor = PriceProcessor()
def test_parse_price_currency(self):
# Test EUR with symbol
def test_parse_price_currency_eur(self):
"""Test EUR price parsing"""
price, currency = self.processor.parse_price_currency("8.26 EUR")
assert price == "8.26"
assert currency == "EUR"
# Test USD with symbol
# Test with euro symbol
price, currency = self.processor.parse_price_currency("8.26 €")
assert price == "8.26"
assert currency == "EUR"
def test_parse_price_currency_usd(self):
"""Test USD price parsing"""
price, currency = self.processor.parse_price_currency("$12.50")
assert price == "12.50"
assert currency == "USD"
# Test with comma decimal separator
price, currency = self.processor.parse_price_currency("8,26 €")
price, currency = self.processor.parse_price_currency("12.50 USD")
assert price == "12.50"
assert currency == "USD"
def test_parse_price_currency_comma_decimal(self):
"""Test price parsing with comma as decimal separator"""
price, currency = self.processor.parse_price_currency("8,26 EUR")
assert price == "8.26"
assert currency == "EUR"
def test_parse_invalid_price(self):
"""Test invalid price parsing"""
price, currency = self.processor.parse_price_currency("")
assert price is None
assert currency is None
price, currency = self.processor.parse_price_currency(None)
assert price is None
assert currency is None
assert currency is None
def test_parse_price_edge_cases(self):
"""Test edge cases in price parsing"""
# Test price without currency
price, currency = self.processor.parse_price_currency("15.99")
assert price == "15.99"
# currency might be None or default value
# Test currency before price
price, currency = self.processor.parse_price_currency("EUR 25.50")
assert price == "25.50"
assert currency == "EUR"
# Test with multiple decimal places
price, currency = self.processor.parse_price_currency("12.999 USD")
assert price == "12.999"
assert currency == "USD"