diff --git a/app/api/v1/admin.py b/app/api/v1/admin.py index 286fe2b8..3da3c782 100644 --- a/app/api/v1/admin.py +++ b/app/api/v1/admin.py @@ -16,11 +16,8 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_admin_user from app.core.database import get_db from app.services.admin_service import admin_service -from models.api_models import ( - MarketplaceImportJobResponse, - ShopListResponse, - UserResponse, -) +from models.api_models import (MarketplaceImportJobResponse, ShopListResponse, + UserResponse) from models.database_models import User router = APIRouter() diff --git a/app/api/v1/auth.py b/app/api/v1/auth.py index 391801f8..a9ed4271 100644 --- a/app/api/v1/auth.py +++ b/app/api/v1/auth.py @@ -15,7 +15,8 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.database import get_db from app.services.auth_service import auth_service -from models.api_models import LoginResponse, UserLogin, UserRegister, UserResponse +from models.api_models import (LoginResponse, UserLogin, UserRegister, + UserResponse) from models.database_models import User router = APIRouter() diff --git a/app/api/v1/marketplace.py b/app/api/v1/marketplace.py index 37757230..7881b846 100644 --- a/app/api/v1/marketplace.py +++ b/app/api/v1/marketplace.py @@ -18,7 +18,8 @@ from app.core.database import get_db from app.services.marketplace_service import marketplace_service from app.tasks.background_tasks import process_marketplace_import from middleware.decorators import rate_limit -from models.api_models import MarketplaceImportJobResponse, MarketplaceImportRequest +from models.api_models import (MarketplaceImportJobResponse, + MarketplaceImportRequest) from models.database_models import User router = APIRouter() diff --git a/app/api/v1/product.py b/app/api/v1/product.py index d669fe1d..52979989 100644 --- a/app/api/v1/product.py +++ b/app/api/v1/product.py @@ -17,13 +17,9 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.database import get_db from app.services.product_service import product_service -from models.api_models import ( - ProductCreate, - ProductDetailResponse, - ProductListResponse, - ProductResponse, - ProductUpdate, -) +from models.api_models import (ProductCreate, ProductDetailResponse, + ProductListResponse, ProductResponse, + ProductUpdate) from models.database_models import User router = APIRouter() diff --git a/app/api/v1/shop.py b/app/api/v1/shop.py index bd6e2155..e0b32fe2 100644 --- a/app/api/v1/shop.py +++ b/app/api/v1/shop.py @@ -15,14 +15,8 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user, get_user_shop from app.core.database import get_db from app.services.shop_service import shop_service - -from models.api_models import ( - ShopCreate, - ShopListResponse, - ShopProductCreate, - ShopProductResponse, - ShopResponse, -) +from models.api_models import (ShopCreate, ShopListResponse, ShopProductCreate, + ShopProductResponse, ShopResponse) from models.database_models import User router = APIRouter() diff --git a/app/api/v1/stats.py b/app/api/v1/stats.py index 472a5ffc..ddcb3f6d 100644 --- a/app/api/v1/stats.py +++ b/app/api/v1/stats.py @@ -16,10 +16,7 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.database import get_db from app.services.stats_service import stats_service -from models.api_models import ( - MarketplaceStatsResponse, - StatsResponse, -) +from models.api_models import MarketplaceStatsResponse, StatsResponse from models.database_models import User router = APIRouter() diff --git a/app/api/v1/stock.py b/app/api/v1/stock.py index 22a504a4..bd5a4eb1 100644 --- a/app/api/v1/stock.py +++ b/app/api/v1/stock.py @@ -16,13 +16,8 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_user from app.core.database import get_db from app.services.stock_service import stock_service -from models.api_models import ( - StockAdd, - StockCreate, - StockResponse, - StockSummaryResponse, - StockUpdate, -) +from models.api_models import (StockAdd, StockCreate, StockResponse, + StockSummaryResponse, StockUpdate) from models.database_models import User router = APIRouter() diff --git a/app/services/auth_service.py b/app/services/auth_service.py index b0f31de4..4a264220 100644 --- a/app/services/auth_service.py +++ b/app/services/auth_service.py @@ -29,7 +29,7 @@ class AuthService: def register_user(self, db: Session, user_data: UserRegister) -> User: """ - Register a new user + Register a new user. Args: db: Database session diff --git a/app/services/marketplace_service.py b/app/services/marketplace_service.py index 9d4c90d6..94daff7f 100644 --- a/app/services/marketplace_service.py +++ b/app/services/marketplace_service.py @@ -22,12 +22,14 @@ logger = logging.getLogger(__name__) class MarketplaceService: + """Service class for Marketplace operations following the application's service pattern.""" + def __init__(self): """Class constructor.""" pass def validate_shop_access(self, db: Session, shop_code: str, user: User) -> Shop: - """Validate that the shop exists and user has access to it""" + """Validate that the shop exists and user has access to it.""" # Explicit type hint to help type checker shop: Optional[Shop] # Use case-insensitive query to handle both uppercase and lowercase codes shop: Optional[Shop] = ( @@ -67,7 +69,8 @@ class MarketplaceService: db.refresh(import_job) logger.info( - f"Created marketplace import job {import_job.id}: {request.marketplace} -> {shop.shop_name} (shop_code: {shop.shop_code}) by user {user.username}" + f"Created marketplace import job {import_job.id}: " + f"{request.marketplace} -> {shop.shop_name} (shop_code: {shop.shop_code}) by user {user.username}" ) return import_job diff --git a/app/services/product_service.py b/app/services/product_service.py index c43c1cc8..80b6a935 100644 --- a/app/services/product_service.py +++ b/app/services/product_service.py @@ -23,6 +23,8 @@ logger = logging.getLogger(__name__) class ProductService: + """Service class for Product operations following the application's service pattern.""" + def __init__(self): """Class constructor.""" self.gtin_processor = GTINProcessor() diff --git a/app/services/shop_service.py b/app/services/shop_service.py index 07dfe713..fae9f1df 100644 --- a/app/services/shop_service.py +++ b/app/services/shop_service.py @@ -8,7 +8,6 @@ This module provides classes and functions for: """ import logging - from typing import List, Optional, Tuple from fastapi import HTTPException @@ -28,7 +27,7 @@ class ShopService: self, db: Session, shop_data: ShopCreate, current_user: User ) -> Shop: """ - Create a new shop + Create a new shop. Args: db: Database session @@ -256,19 +255,19 @@ class ShopService: return shop_products, total def get_shop_by_id(self, db: Session, shop_id: int) -> Optional[Shop]: - """Get shop by ID""" + """Get shop by ID.""" return db.query(Shop).filter(Shop.id == shop_id).first() def shop_code_exists(self, db: Session, shop_code: str) -> bool: - """Check if shop code already exists""" + """Check if shop code already exists.""" return db.query(Shop).filter(Shop.shop_code == shop_code).first() is not None def get_product_by_id(self, db: Session, product_id: str) -> Optional[Product]: - """Get product by product_id""" + """Get product by product_id.""" return db.query(Product).filter(Product.product_id == product_id).first() def product_in_shop(self, db: Session, shop_id: int, product_id: int) -> bool: - """Check if product is already in shop""" + """Check if product is already in shop.""" return ( db.query(ShopProduct) .filter( @@ -279,11 +278,11 @@ class ShopService: ) def is_shop_owner(self, shop: Shop, user: User) -> bool: - """Check if user is shop owner""" + """Check if user is shop owner.""" return shop.owner_id == user.id def can_view_shop(self, shop: Shop, user: User) -> bool: - """Check if user can view shop""" + """Check if user can view shop.""" if user.role == "admin" or self.is_shop_owner(shop, user): return True return shop.is_active and shop.is_verified diff --git a/app/services/stock_service.py b/app/services/stock_service.py index 12d4e9f7..3c39d797 100644 --- a/app/services/stock_service.py +++ b/app/services/stock_service.py @@ -23,6 +23,7 @@ logger = logging.getLogger(__name__) class StockService: """Service class for stock operations following the application's service pattern.""" + def __init__(self): """Class constructor.""" self.gtin_processor = GTINProcessor() diff --git a/app/tasks/background_tasks.py b/app/tasks/background_tasks.py index 186a0d7d..fb2e55cd 100644 --- a/app/tasks/background_tasks.py +++ b/app/tasks/background_tasks.py @@ -1,4 +1,12 @@ # app/tasks/background_tasks.py +"""Summary description .... + +This module provides classes and functions for: +- .... +- .... +- .... +""" + import logging from datetime import datetime @@ -12,7 +20,7 @@ logger = logging.getLogger(__name__) async def process_marketplace_import( job_id: int, url: str, marketplace: str, shop_name: str, batch_size: int = 1000 ): - """Background task to process marketplace CSV import""" + """Background task to process marketplace CSV import.""" db = SessionLocal() csv_processor = CSVProcessor() job = None # Initialize job variable diff --git a/auth_fixtures.py b/auth_fixtures.py new file mode 100644 index 00000000..6a4dfe57 --- /dev/null +++ b/auth_fixtures.py @@ -0,0 +1,91 @@ +# tests/fixtures/auth_fixtures.py +import uuid + +import pytest + +from middleware.auth import AuthManager +from models.database_models import User + + +@pytest.fixture(scope="session") +def auth_manager(): + """Create auth manager instance (session scope since it's stateless)""" + return AuthManager() + + +@pytest.fixture +def test_user(db, auth_manager): + """Create a test user with unique username""" + unique_id = str(uuid.uuid4())[:8] # Short unique identifier + hashed_password = auth_manager.hash_password("testpass123") + user = User( + email=f"test_{unique_id}@example.com", + username=f"testuser_{unique_id}", + hashed_password=hashed_password, + role="user", + is_active=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture +def test_admin(db, auth_manager): + """Create a test admin user with unique username""" + unique_id = str(uuid.uuid4())[:8] # Short unique identifier + hashed_password = auth_manager.hash_password("adminpass123") + admin = User( + email=f"admin_{unique_id}@example.com", + username=f"admin_{unique_id}", + hashed_password=hashed_password, + role="admin", + is_active=True, + ) + db.add(admin) + db.commit() + db.refresh(admin) + return admin + + +@pytest.fixture +def other_user(db, auth_manager): + """Create a different user for testing access controls""" + unique_id = str(uuid.uuid4())[:8] + hashed_password = auth_manager.hash_password("otherpass123") + user = User( + email=f"other_{unique_id}@example.com", + username=f"otheruser_{unique_id}", + hashed_password=hashed_password, + role="user", + is_active=True, + ) + db.add(user) + db.commit() + db.refresh(user) + return user + + +@pytest.fixture +def auth_headers(client, test_user): + """Get authentication headers for test user""" + response = client.post( + "/api/v1/auth/login", + json={"username": test_user.username, "password": "testpass123"}, + ) + assert response.status_code == 200, f"Login failed: {response.text}" + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def admin_headers(client, test_admin): + """Get authentication headers for admin user""" + response = client.post( + "/api/v1/auth/login", + json={"username": test_admin.username, "password": "adminpass123"}, + ) + assert response.status_code == 200, f"Admin login failed: {response.text}" + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} diff --git a/enhanced_pytest_config.txt b/enhanced_pytest_config.txt new file mode 100644 index 00000000..4baa5db2 --- /dev/null +++ b/enhanced_pytest_config.txt @@ -0,0 +1,80 @@ +# pytest.ini - Enhanced configuration for your FastAPI test suite +[tool:pytest] +testpaths = tests +python_files = test_*.py +python_classes = Test* +python_functions = test_* + +# Enhanced addopts for better development experience +addopts = + -v + --tb=short + --strict-markers + --strict-config + --color=yes + --durations=10 + --showlocals + -ra + --cov=app + --cov=models + --cov=utils + --cov=middleware + --cov-report=term-missing + --cov-report=html:htmlcov + --cov-fail-under=80 + +# Test discovery and execution settings +minversion = 6.0 +testmon = true +python_paths = . + +# Markers for your specific test organization +markers = + # Test Types (for your new structure) + unit: Unit tests - fast, isolated components + integration: Integration tests - multiple components working together + system: System tests - full application behavior + e2e: End-to-end tests - complete user workflows + + # Performance and Speed + slow: Slow running tests (deselect with '-m "not slow"') + performance: Performance and load tests + + # Domain-specific markers (matching your application structure) + auth: Authentication and authorization tests + products: Product management functionality + stock: Stock and inventory management + shops: Shop management functionality + admin: Admin functionality and permissions + marketplace: Marketplace import functionality + stats: Statistics and reporting + + # Infrastructure markers + database: Tests that require database operations + external: Tests that require external services + api: API endpoint tests + security: Security-related tests + + # Test environment markers + ci: Tests that should only run in CI + dev: Development-specific tests + +# Test filtering shortcuts +filterwarnings = + ignore::UserWarning + ignore::DeprecationWarning + ignore::PendingDeprecationWarning + ignore::sqlalchemy.exc.SAWarning + +# Timeout settings +timeout = 300 +timeout_method = thread + +# Parallel execution settings (uncomment if using pytest-xdist) +# addopts = -n auto + +# Additional logging configuration +log_cli = true +log_cli_level = INFO +log_cli_format = %(asctime)s [%(levelname)8s] %(name)s: %(message)s +log_cli_date_format = %Y-%m-%d %H:%M:%S \ No newline at end of file diff --git a/init_files.py b/init_files.py new file mode 100644 index 00000000..d5fffe2a --- /dev/null +++ b/init_files.py @@ -0,0 +1,60 @@ +# tests/fixtures/__init__.py +"""Test fixtures for the FastAPI application test suite.""" + +# tests/unit/__init__.py +"""Unit tests - fast, isolated component tests.""" + +# tests/unit/models/__init__.py +"""Database and API model unit tests.""" + +# tests/unit/utils/__init__.py +"""Utility function unit tests.""" + +# tests/unit/services/__init__.py +"""Service layer unit tests.""" + +# tests/integration/__init__.py +"""Integration tests - multiple components working together.""" + +# tests/integration/api/__init__.py +"""API integration tests.""" + +# tests/integration/api/v1/__init__.py +"""API v1 endpoint integration tests.""" + +# tests/integration/security/__init__.py +"""Security integration tests.""" + +# tests/performance/__init__.py +"""Performance and load tests.""" + +# tests/system/__init__.py +"""System-level tests - full application behavior.""" + +# tests/integration/conftest.py +"""Integration test specific fixtures.""" +import pytest + +# Add any integration-specific fixtures here if needed + +# tests/unit/conftest.py +"""Unit test specific fixtures.""" +import pytest + +# Add any unit-specific fixtures here if needed + +# tests/performance/conftest.py +"""Performance test specific fixtures.""" +import pytest + +@pytest.fixture +def performance_db_session(db): + """Database session optimized for performance testing""" + # You can add performance-specific DB configurations here + return db + +# tests/system/conftest.py +"""System test specific fixtures.""" +import pytest + +# Add any system-specific fixtures here if needed \ No newline at end of file diff --git a/integration_admin_endpoints.py b/integration_admin_endpoints.py new file mode 100644 index 00000000..5e756e26 --- /dev/null +++ b/integration_admin_endpoints.py @@ -0,0 +1,195 @@ +# tests/integration/api/v1/test_admin_endpoints.py +import pytest + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminAPI: + def test_get_all_users_admin(self, client, admin_headers, test_user): + """Test admin getting all users""" + response = client.get("/api/v1/admin/users", headers=admin_headers) + + assert response.status_code == 200 + data = response.json() + assert len(data) >= 2 # test_user + admin user + + # Check that test_user is in the response + user_ids = [user["id"] for user in data if "id" in user] + assert test_user.id in user_ids + + def test_get_all_users_non_admin(self, client, auth_headers): + """Test non-admin trying to access admin endpoint""" + response = client.get("/api/v1/admin/users", headers=auth_headers) + + assert response.status_code == 403 + assert ( + "Access denied" in response.json()["detail"] + or "admin" in response.json()["detail"].lower() + ) + + def test_toggle_user_status_admin(self, client, admin_headers, test_user): + """Test admin toggling user status""" + response = client.put( + f"/api/v1/admin/users/{test_user.id}/status", headers=admin_headers + ) + + assert response.status_code == 200 + message = response.json()["message"] + assert "deactivated" in message or "activated" in message + # Verify the username is in the message + assert test_user.username in message + + def test_toggle_user_status_user_not_found(self, client, admin_headers): + """Test admin toggling status for non-existent user""" + response = client.put("/api/v1/admin/users/99999/status", headers=admin_headers) + + assert response.status_code == 404 + assert "User not found" in response.json()["detail"] + + def test_toggle_user_status_cannot_deactivate_self( + self, client, admin_headers, test_admin + ): + """Test that admin cannot deactivate their own account""" + response = client.put( + f"/api/v1/admin/users/{test_admin.id}/status", headers=admin_headers + ) + + assert response.status_code == 400 + assert "Cannot deactivate your own account" in response.json()["detail"] + + def test_get_all_shops_admin(self, client, admin_headers, test_shop): + """Test admin getting all shops""" + response = client.get("/api/v1/admin/shops", headers=admin_headers) + + assert response.status_code == 200 + data = response.json() + assert data["total"] >= 1 + assert len(data["shops"]) >= 1 + + # Check that test_shop is in the response + shop_codes = [ + shop["shop_code"] for shop in data["shops"] if "shop_code" in shop + ] + assert test_shop.shop_code in shop_codes + + def test_get_all_shops_non_admin(self, client, auth_headers): + """Test non-admin trying to access admin shop endpoint""" + response = client.get("/api/v1/admin/shops", headers=auth_headers) + + assert response.status_code == 403 + assert ( + "Access denied" in response.json()["detail"] + or "admin" in response.json()["detail"].lower() + ) + + def test_verify_shop_admin(self, client, admin_headers, test_shop): + """Test admin verifying/unverifying shop""" + response = client.put( + f"/api/v1/admin/shops/{test_shop.id}/verify", headers=admin_headers + ) + + assert response.status_code == 200 + message = response.json()["message"] + assert "verified" in message or "unverified" in message + assert test_shop.shop_code in message + + def test_verify_shop_not_found(self, client, admin_headers): + """Test admin verifying non-existent shop""" + response = client.put("/api/v1/admin/shops/99999/verify", headers=admin_headers) + + assert response.status_code == 404 + assert "Shop not found" in response.json()["detail"] + + def test_toggle_shop_status_admin(self, client, admin_headers, test_shop): + """Test admin toggling shop status""" + response = client.put( + f"/api/v1/admin/shops/{test_shop.id}/status", headers=admin_headers + ) + + assert response.status_code == 200 + message = response.json()["message"] + assert "activated" in message or "deactivated" in message + assert test_shop.shop_code in message + + def test_toggle_shop_status_not_found(self, client, admin_headers): + """Test admin toggling status for non-existent shop""" + response = client.put("/api/v1/admin/shops/99999/status", headers=admin_headers) + + assert response.status_code == 404 + assert "Shop not found" in response.json()["detail"] + + def test_get_marketplace_import_jobs_admin( + self, client, admin_headers, test_marketplace_job + ): + """Test admin getting marketplace import jobs""" + response = client.get( + "/api/v1/admin/marketplace-import-jobs", headers=admin_headers + ) + + assert response.status_code == 200 + data = response.json() + assert len(data) >= 1 + + # Check that test_marketplace_job is in the response + job_ids = [job["job_id"] for job in data if "job_id" in job] + assert test_marketplace_job.id in job_ids + + def test_get_marketplace_import_jobs_with_filters( + self, client, admin_headers, test_marketplace_job + ): + """Test admin getting marketplace import jobs with filters""" + response = client.get( + "/api/v1/admin/marketplace-import-jobs", + params={"marketplace": test_marketplace_job.marketplace}, + headers=admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert len(data) >= 1 + assert all( + job["marketplace"] == test_marketplace_job.marketplace for job in data + ) + + def test_get_marketplace_import_jobs_non_admin(self, client, auth_headers): + """Test non-admin trying to access marketplace import jobs""" + response = client.get( + "/api/v1/admin/marketplace-import-jobs", headers=auth_headers + ) + + assert response.status_code == 403 + assert ( + "Access denied" in response.json()["detail"] + or "admin" in response.json()["detail"].lower() + ) + + def test_admin_pagination_users(self, client, admin_headers, test_user, test_admin): + """Test user pagination works correctly""" + # Test first page + response = client.get( + "/api/v1/admin/users?skip=0&limit=1", headers=admin_headers + ) + assert response.status_code == 200 + data = response.json() + assert len(data) == 1 + + # Test second page + response = client.get( + "/api/v1/admin/users?skip=1&limit=1", headers=admin_headers + ) + assert response.status_code == 200 + data = response.json() + assert len(data) >= 0 # Could be 1 or 0 depending on total users + + def test_admin_pagination_shops(self, client, admin_headers, test_shop): + """Test shop pagination works correctly""" + response = client.get( + "/api/v1/admin/shops?skip=0&limit=1", headers=admin_headers + ) + assert response.status_code == 200 + data = response.json() + assert data["total"] >= 1 + assert len(data["shops"]) >= 0 + assert "skip" in data + assert "limit" in data diff --git a/integration_authentication.py b/integration_authentication.py new file mode 100644 index 00000000..8b4d4fa0 --- /dev/null +++ b/integration_authentication.py @@ -0,0 +1,75 @@ +# tests/integration/security/test_authentication.py +import pytest + + +@pytest.mark.integration +@pytest.mark.security +@pytest.mark.auth +class TestAuthentication: + def test_protected_endpoint_without_auth(self, client): + """Test that protected endpoints reject unauthenticated requests""" + protected_endpoints = [ + "/api/v1/admin/users", + "/api/v1/admin/shops", + "/api/v1/marketplace/import-jobs", + "/api/v1/product", + "/api/v1/shop", + "/api/v1/stats", + "/api/v1/stock", + ] + + for endpoint in protected_endpoints: + response = client.get(endpoint) + assert response.status_code == 401 # Authentication missing + + def test_protected_endpoint_with_invalid_token(self, client): + """Test protected endpoints with invalid token""" + headers = {"Authorization": "Bearer invalid_token_here"} + + response = client.get("/api/v1/product", headers=headers) + assert response.status_code == 401 # Token is not valid + + def test_debug_direct_bearer(self, client): + """Test HTTPBearer directly""" + response = client.get("/api/v1/debug-bearer") + print(f"Direct Bearer - Status: {response.status_code}") + print( + f"Direct Bearer - Response: {response.json() if response.content else 'No content'}" + ) + + def test_debug_dependencies(self, client): + """Debug the dependency chain step by step""" + # Test 1: Direct endpoint with no auth + response = client.get("/api/v1/admin/users") + print(f"Admin endpoint - Status: {response.status_code}") + try: + print(f"Admin endpoint - Response: {response.json()}") + except: + print(f"Admin endpoint - Raw: {response.content}") + + # Test 2: Try a regular endpoint that uses get_current_user + response2 = client.get("/api/v1/product") # or any endpoint with get_current_user + print(f"Regular endpoint - Status: {response2.status_code}") + try: + print(f"Regular endpoint - Response: {response2.json()}") + except: + print(f"Regular endpoint - Raw: {response2.content}") + + def test_debug_available_routes(self, client): + """Debug test to see all available routes""" + print("\n=== All Available Routes ===") + for route in client.app.routes: + if hasattr(route, "path") and hasattr(route, "methods"): + print(f"{list(route.methods)} {route.path}") + + print("\n=== Testing Product Endpoint Variations ===") + variations = [ + "/api/v1/product", # Your current attempt + "/api/v1/product/", # With trailing slash + "/api/v1/product/list", # With list endpoint + "/api/v1/product/all", # With all endpoint + ] + + for path in variations: + response = client.get(path) + print(f"{path}: Status {response.status_code}") diff --git a/integration_authorization.py b/integration_authorization.py new file mode 100644 index 00000000..26ab9e3d --- /dev/null +++ b/integration_authorization.py @@ -0,0 +1,46 @@ +# tests/integration/security/test_authorization.py +import pytest + + +@pytest.mark.integration +@pytest.mark.security +@pytest.mark.auth +class TestAuthorization: + def test_admin_endpoint_requires_admin_role(self, client, auth_headers): + """Test that admin endpoints require admin role""" + response = client.get("/api/v1/admin/users", headers=auth_headers) + assert response.status_code == 403 + # Regular user should be denied access + + def test_admin_endpoints_with_admin_access(self, client, admin_headers): + """Test that admin users can access admin endpoints""" + admin_endpoints = [ + "/api/v1/admin/users", + "/api/v1/admin/shops", + "/api/v1/admin/marketplace-import-jobs", + ] + + for endpoint in admin_endpoints: + response = client.get(endpoint, headers=admin_headers) + assert response.status_code == 200 # Admin should have access + + def test_regular_endpoints_with_user_access(self, client, auth_headers): + """Test that regular users can access non-admin endpoints""" + user_endpoints = [ + "/api/v1/product", + "/api/v1/stats", + "/api/v1/stock", + ] + + for endpoint in user_endpoints: + response = client.get(endpoint, headers=auth_headers) + assert response.status_code == 200 # Regular user should have access + + def test_shop_owner_access_control(self, client, auth_headers, test_shop, other_user): + """Test that users can only access their own shops""" + # Test accessing own shop (should work) + response = client.get(f"/api/v1/shop/{test_shop.shop_code}", headers=auth_headers) + # Response depends on your implementation - could be 200 or 404 if shop doesn't belong to user + + # The exact assertion depends on your shop access control implementation + assert response.status_code in [200, 403, 404] diff --git a/integration_input_validation.py b/integration_input_validation.py new file mode 100644 index 00000000..6afedfdd --- /dev/null +++ b/integration_input_validation.py @@ -0,0 +1,65 @@ +# tests/integration/security/test_input_validation.py +import pytest + + +@pytest.mark.integration +@pytest.mark.security +class TestInputValidation: + def test_sql_injection_prevention(self, client, auth_headers): + """Test SQL injection prevention in search parameters""" + # Try SQL injection in search parameter + malicious_search = "'; DROP TABLE products; --" + + response = client.get( + f"/api/v1/product?search={malicious_search}", headers=auth_headers + ) + + # Should not crash and should return normal response + assert response.status_code == 200 + # Database should still be intact (no products dropped) + + # def test_input_validation(self, client, auth_headers): + # # TODO: implement sanitization + # """Test input validation and sanitization""" + # # Test XSS attempt in product creation + # xss_payload = "" + # + # product_data = { + # "product_id": "XSS_TEST", + # "title": xss_payload, + # "description": xss_payload, + # } + # + # response = client.post("/api/v1/product", headers=auth_headers, json=product_data) + # + # assert response.status_code == 200 + # data = response.json() + # assert "