Refactoring code for modular approach
This commit is contained in:
@@ -12,6 +12,7 @@ from app.core.database import get_db, Base
|
||||
# Import all models to ensure they're registered with Base metadata
|
||||
from models.database_models import User, Product, Stock, Shop, MarketplaceImportJob, ShopProduct
|
||||
from middleware.auth import AuthManager
|
||||
import uuid
|
||||
|
||||
# Use in-memory SQLite database for tests
|
||||
SQLALCHEMY_TEST_DATABASE_URL = "sqlite:///:memory:"
|
||||
@@ -81,11 +82,12 @@ def auth_manager():
|
||||
|
||||
@pytest.fixture
|
||||
def test_user(db, auth_manager):
|
||||
"""Create a test user"""
|
||||
"""Create a test user with unique username"""
|
||||
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
|
||||
hashed_password = auth_manager.hash_password("testpass123")
|
||||
user = User(
|
||||
email="test@example.com",
|
||||
username="testuser",
|
||||
email=f"test_{unique_id}@example.com",
|
||||
username=f"testuser_{unique_id}",
|
||||
hashed_password=hashed_password,
|
||||
role="user",
|
||||
is_active=True
|
||||
@@ -98,11 +100,12 @@ def test_user(db, auth_manager):
|
||||
|
||||
@pytest.fixture
|
||||
def test_admin(db, auth_manager):
|
||||
"""Create a test admin user"""
|
||||
"""Create a test admin user with unique username"""
|
||||
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
|
||||
hashed_password = auth_manager.hash_password("adminpass123")
|
||||
admin = User(
|
||||
email="admin@example.com",
|
||||
username="admin",
|
||||
email=f"admin_{unique_id}@example.com",
|
||||
username=f"admin_{unique_id}",
|
||||
hashed_password=hashed_password,
|
||||
role="admin",
|
||||
is_active=True
|
||||
@@ -117,7 +120,7 @@ def test_admin(db, auth_manager):
|
||||
def auth_headers(client, test_user):
|
||||
"""Get authentication headers for test user"""
|
||||
response = client.post("/api/v1/auth/login", json={
|
||||
"username": "testuser",
|
||||
"username": test_user.username,
|
||||
"password": "testpass123"
|
||||
})
|
||||
assert response.status_code == 200, f"Login failed: {response.text}"
|
||||
@@ -129,7 +132,7 @@ def auth_headers(client, test_user):
|
||||
def admin_headers(client, test_admin):
|
||||
"""Get authentication headers for admin user"""
|
||||
response = client.post("/api/v1/auth/login", json={
|
||||
"username": "admin",
|
||||
"username": test_admin.username,
|
||||
"password": "adminpass123"
|
||||
})
|
||||
assert response.status_code == 200, f"Admin login failed: {response.text}"
|
||||
@@ -160,10 +163,11 @@ def test_product(db):
|
||||
|
||||
@pytest.fixture
|
||||
def test_shop(db, test_user):
|
||||
"""Create a test shop"""
|
||||
"""Create a test shop with unique shop code"""
|
||||
unique_id = str(uuid.uuid4())[:8] # Short unique identifier
|
||||
shop = Shop(
|
||||
shop_code="TESTSHOP",
|
||||
shop_name="Test Shop",
|
||||
shop_code=f"TESTSHOP_{unique_id}",
|
||||
shop_name=f"Test Shop {unique_id}",
|
||||
owner_id=test_user.id,
|
||||
is_active=True,
|
||||
is_verified=True
|
||||
@@ -190,6 +194,50 @@ def test_stock(db, test_product, test_shop):
|
||||
return stock
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def test_marketplace_job(db, test_shop): # Add test_shop dependency
|
||||
"""Create a test marketplace import job"""
|
||||
job = MarketplaceImportJob(
|
||||
marketplace="amazon",
|
||||
shop_name="Test Import Shop",
|
||||
status="completed",
|
||||
source_url="https://test-marketplace.example.com/import",
|
||||
shop_id=test_shop.id, # Add required shop_id
|
||||
imported_count=5,
|
||||
updated_count=3,
|
||||
total_processed=8,
|
||||
error_count=0,
|
||||
error_message=None
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
return job
|
||||
|
||||
|
||||
def create_test_import_job(db, shop_id, **kwargs): # Add shop_id parameter
|
||||
"""Helper function to create MarketplaceImportJob with defaults"""
|
||||
defaults = {
|
||||
'marketplace': 'test',
|
||||
'shop_name': 'Test Shop',
|
||||
'status': 'pending',
|
||||
'source_url': 'https://test.example.com/import',
|
||||
'shop_id': shop_id, # Add required shop_id
|
||||
'imported_count': 0,
|
||||
'updated_count': 0,
|
||||
'total_processed': 0,
|
||||
'error_count': 0,
|
||||
'error_message': None
|
||||
}
|
||||
defaults.update(kwargs)
|
||||
|
||||
job = MarketplaceImportJob(**defaults)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
return job
|
||||
|
||||
|
||||
# Cleanup fixture to ensure clean state
|
||||
@pytest.fixture(autouse=True)
|
||||
def cleanup():
|
||||
|
||||
@@ -11,6 +11,10 @@ class TestAdminAPI:
|
||||
data = response.json()
|
||||
assert len(data) >= 2 # test_user + admin user
|
||||
|
||||
# Check that test_user is in the response
|
||||
user_ids = [user["id"] for user in data if "id" in user]
|
||||
assert test_user.id in user_ids
|
||||
|
||||
def test_get_all_users_non_admin(self, client, auth_headers):
|
||||
"""Test non-admin trying to access admin endpoint"""
|
||||
response = client.get("/api/v1/admin/users", headers=auth_headers)
|
||||
@@ -23,7 +27,24 @@ class TestAdminAPI:
|
||||
response = client.put(f"/api/v1/admin/users/{test_user.id}/status", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
assert "deactivated" in response.json()["message"] or "activated" in response.json()["message"]
|
||||
message = response.json()["message"]
|
||||
assert "deactivated" in message or "activated" in message
|
||||
# Verify the username is in the message
|
||||
assert test_user.username in message
|
||||
|
||||
def test_toggle_user_status_user_not_found(self, client, admin_headers):
|
||||
"""Test admin toggling status for non-existent user"""
|
||||
response = client.put("/api/v1/admin/users/99999/status", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "User not found" in response.json()["detail"]
|
||||
|
||||
def test_toggle_user_status_cannot_deactivate_self(self, client, admin_headers, test_admin):
|
||||
"""Test that admin cannot deactivate their own account"""
|
||||
response = client.put(f"/api/v1/admin/users/{test_admin.id}/status", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 400
|
||||
assert "Cannot deactivate your own account" in response.json()["detail"]
|
||||
|
||||
def test_get_all_shops_admin(self, client, admin_headers, test_shop):
|
||||
"""Test admin getting all shops"""
|
||||
@@ -32,3 +53,115 @@ class TestAdminAPI:
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["total"] >= 1
|
||||
assert len(data["shops"]) >= 1
|
||||
|
||||
# Check that test_shop is in the response
|
||||
shop_codes = [shop["shop_code"] for shop in data["shops"] if "shop_code" in shop]
|
||||
assert test_shop.shop_code in shop_codes
|
||||
|
||||
def test_get_all_shops_non_admin(self, client, auth_headers):
|
||||
"""Test non-admin trying to access admin shop endpoint"""
|
||||
response = client.get("/api/v1/admin/shops", headers=auth_headers)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
|
||||
|
||||
def test_verify_shop_admin(self, client, admin_headers, test_shop):
|
||||
"""Test admin verifying/unverifying shop"""
|
||||
response = client.put(f"/api/v1/admin/shops/{test_shop.id}/verify", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
message = response.json()["message"]
|
||||
assert "verified" in message or "unverified" in message
|
||||
assert test_shop.shop_code in message
|
||||
|
||||
def test_verify_shop_not_found(self, client, admin_headers):
|
||||
"""Test admin verifying non-existent shop"""
|
||||
response = client.put("/api/v1/admin/shops/99999/verify", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "Shop not found" in response.json()["detail"]
|
||||
|
||||
def test_toggle_shop_status_admin(self, client, admin_headers, test_shop):
|
||||
"""Test admin toggling shop status"""
|
||||
response = client.put(f"/api/v1/admin/shops/{test_shop.id}/status", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
message = response.json()["message"]
|
||||
assert "activated" in message or "deactivated" in message
|
||||
assert test_shop.shop_code in message
|
||||
|
||||
def test_toggle_shop_status_not_found(self, client, admin_headers):
|
||||
"""Test admin toggling status for non-existent shop"""
|
||||
response = client.put("/api/v1/admin/shops/99999/status", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 404
|
||||
assert "Shop not found" in response.json()["detail"]
|
||||
|
||||
def test_get_marketplace_import_jobs_admin(self, client, admin_headers, test_marketplace_job):
|
||||
"""Test admin getting marketplace import jobs"""
|
||||
response = client.get("/api/v1/admin/marketplace-import-jobs", headers=admin_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) >= 1
|
||||
|
||||
# Check that test_marketplace_job is in the response
|
||||
job_ids = [job["job_id"] for job in data if "job_id" in job]
|
||||
assert test_marketplace_job.id in job_ids
|
||||
|
||||
def test_get_marketplace_import_jobs_with_filters(self, client, admin_headers, test_marketplace_job):
|
||||
"""Test admin getting marketplace import jobs with filters"""
|
||||
response = client.get(
|
||||
"/api/v1/admin/marketplace-import-jobs",
|
||||
params={"marketplace": test_marketplace_job.marketplace},
|
||||
headers=admin_headers
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) >= 1
|
||||
assert all(job["marketplace"] == test_marketplace_job.marketplace for job in data)
|
||||
|
||||
def test_get_marketplace_import_jobs_non_admin(self, client, auth_headers):
|
||||
"""Test non-admin trying to access marketplace import jobs"""
|
||||
response = client.get("/api/v1/admin/marketplace-import-jobs", headers=auth_headers)
|
||||
|
||||
assert response.status_code == 403
|
||||
assert "Access denied" in response.json()["detail"] or "admin" in response.json()["detail"].lower()
|
||||
|
||||
def test_admin_endpoints_require_authentication(self, client):
|
||||
"""Test that admin endpoints require authentication"""
|
||||
endpoints = [
|
||||
"/api/v1/admin/users",
|
||||
"/api/v1/admin/shops",
|
||||
"/api/v1/admin/marketplace-import-jobs"
|
||||
]
|
||||
|
||||
for endpoint in endpoints:
|
||||
response = client.get(endpoint)
|
||||
assert response.status_code == 401 # Unauthorized
|
||||
|
||||
def test_admin_pagination_users(self, client, admin_headers, test_user, test_admin):
|
||||
"""Test user pagination works correctly"""
|
||||
# Test first page
|
||||
response = client.get("/api/v1/admin/users?skip=0&limit=1", headers=admin_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) == 1
|
||||
|
||||
# Test second page
|
||||
response = client.get("/api/v1/admin/users?skip=1&limit=1", headers=admin_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert len(data) >= 0 # Could be 1 or 0 depending on total users
|
||||
|
||||
def test_admin_pagination_shops(self, client, admin_headers, test_shop):
|
||||
"""Test shop pagination works correctly"""
|
||||
response = client.get("/api/v1/admin/shops?skip=0&limit=1", headers=admin_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["total"] >= 1
|
||||
assert len(data["shops"]) >= 0
|
||||
assert "skip" in data
|
||||
assert "limit" in data
|
||||
|
||||
359
tests/test_admin_service.py
Normal file
359
tests/test_admin_service.py
Normal file
@@ -0,0 +1,359 @@
|
||||
# tests/test_admin_service.py
|
||||
import pytest
|
||||
from datetime import datetime
|
||||
from fastapi import HTTPException
|
||||
|
||||
from app.services.admin_service import AdminService
|
||||
from models.database_models import User, Shop, MarketplaceImportJob
|
||||
|
||||
|
||||
class TestAdminService:
|
||||
"""Test suite for AdminService following the application's testing patterns"""
|
||||
|
||||
def setup_method(self):
|
||||
"""Setup method following the same pattern as product service tests"""
|
||||
self.service = AdminService()
|
||||
|
||||
def test_get_all_users(self, db, test_user, test_admin):
|
||||
"""Test getting all users with pagination"""
|
||||
users = self.service.get_all_users(db, skip=0, limit=10)
|
||||
|
||||
assert len(users) >= 2 # test_user + test_admin
|
||||
user_ids = [user.id for user in users]
|
||||
assert test_user.id in user_ids
|
||||
assert test_admin.id in user_ids
|
||||
|
||||
def test_get_all_users_with_pagination(self, db, test_user, test_admin):
|
||||
"""Test user pagination works correctly"""
|
||||
users = self.service.get_all_users(db, skip=0, limit=1)
|
||||
|
||||
assert len(users) == 1
|
||||
|
||||
users_second_page = self.service.get_all_users(db, skip=1, limit=1)
|
||||
assert len(users_second_page) == 1
|
||||
assert users[0].id != users_second_page[0].id
|
||||
|
||||
def test_toggle_user_status_deactivate(self, db, test_user, test_admin):
|
||||
"""Test deactivating a user"""
|
||||
assert test_user.is_active is True
|
||||
|
||||
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
|
||||
|
||||
assert user.id == test_user.id
|
||||
assert user.is_active is False
|
||||
assert f"{user.username} has been deactivated" in message
|
||||
|
||||
def test_toggle_user_status_activate(self, db, test_user, test_admin):
|
||||
"""Test activating a user"""
|
||||
# First deactivate the user
|
||||
test_user.is_active = False
|
||||
db.commit()
|
||||
|
||||
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
|
||||
|
||||
assert user.id == test_user.id
|
||||
assert user.is_active is True
|
||||
assert f"{user.username} has been activated" in message
|
||||
|
||||
def test_toggle_user_status_user_not_found(self, db, test_admin):
|
||||
"""Test toggle user status when user not found"""
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
self.service.toggle_user_status(db, 99999, test_admin.id)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
assert "User not found" in str(exc_info.value.detail)
|
||||
|
||||
def test_toggle_user_status_cannot_deactivate_self(self, db, test_admin):
|
||||
"""Test that admin cannot deactivate their own account"""
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
self.service.toggle_user_status(db, test_admin.id, test_admin.id)
|
||||
|
||||
assert exc_info.value.status_code == 400
|
||||
assert "Cannot deactivate your own account" in str(exc_info.value.detail)
|
||||
|
||||
def test_get_all_shops(self, db, test_shop):
|
||||
"""Test getting all shops with total count"""
|
||||
shops, total = self.service.get_all_shops(db, skip=0, limit=10)
|
||||
|
||||
assert total >= 1
|
||||
assert len(shops) >= 1
|
||||
shop_codes = [shop.shop_code for shop in shops]
|
||||
assert test_shop.shop_code in shop_codes
|
||||
|
||||
def test_get_all_shops_with_pagination(self, db, test_shop):
|
||||
"""Test shop pagination works correctly"""
|
||||
# Create additional shop for pagination test using the helper function
|
||||
from conftest import create_test_import_job # If you added the helper function
|
||||
|
||||
# Or create directly with proper fields
|
||||
additional_shop = Shop(
|
||||
shop_code=f"{test_shop.shop_code}_2",
|
||||
shop_name="Test Shop 2",
|
||||
owner_id=test_shop.owner_id,
|
||||
is_active=True,
|
||||
is_verified=False
|
||||
)
|
||||
db.add(additional_shop)
|
||||
db.commit()
|
||||
|
||||
shops_page_1 = self.service.get_all_shops(db, skip=0, limit=1)
|
||||
assert len(shops_page_1[0]) == 1
|
||||
|
||||
shops_page_2 = self.service.get_all_shops(db, skip=1, limit=1)
|
||||
assert len(shops_page_2[0]) == 1
|
||||
|
||||
# Ensure different shops on different pages
|
||||
assert shops_page_1[0][0].id != shops_page_2[0][0].id
|
||||
|
||||
def test_verify_shop_mark_verified(self, db, test_shop):
|
||||
"""Test marking shop as verified"""
|
||||
# Ensure shop starts unverified
|
||||
test_shop.is_verified = False
|
||||
db.commit()
|
||||
|
||||
shop, message = self.service.verify_shop(db, test_shop.id)
|
||||
|
||||
assert shop.id == test_shop.id
|
||||
assert shop.is_verified is True
|
||||
assert f"{shop.shop_code} has been verified" in message
|
||||
|
||||
def test_verify_shop_mark_unverified(self, db, test_shop):
|
||||
"""Test marking shop as unverified"""
|
||||
# Ensure shop starts verified
|
||||
test_shop.is_verified = True
|
||||
db.commit()
|
||||
|
||||
shop, message = self.service.verify_shop(db, test_shop.id)
|
||||
|
||||
assert shop.id == test_shop.id
|
||||
assert shop.is_verified is False
|
||||
assert f"{shop.shop_code} has been unverified" in message
|
||||
|
||||
def test_verify_shop_not_found(self, db):
|
||||
"""Test verify shop when shop not found"""
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
self.service.verify_shop(db, 99999)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
assert "Shop not found" in str(exc_info.value.detail)
|
||||
|
||||
def test_toggle_shop_status_deactivate(self, db, test_shop):
|
||||
"""Test deactivating a shop"""
|
||||
assert test_shop.is_active is True
|
||||
|
||||
shop, message = self.service.toggle_shop_status(db, test_shop.id)
|
||||
|
||||
assert shop.id == test_shop.id
|
||||
assert shop.is_active is False
|
||||
assert f"{shop.shop_code} has been deactivated" in message
|
||||
|
||||
def test_toggle_shop_status_activate(self, db, test_shop):
|
||||
"""Test activating a shop"""
|
||||
# First deactivate the shop
|
||||
test_shop.is_active = False
|
||||
db.commit()
|
||||
|
||||
shop, message = self.service.toggle_shop_status(db, test_shop.id)
|
||||
|
||||
assert shop.id == test_shop.id
|
||||
assert shop.is_active is True
|
||||
assert f"{shop.shop_code} has been activated" in message
|
||||
|
||||
def test_toggle_shop_status_not_found(self, db):
|
||||
"""Test toggle shop status when shop not found"""
|
||||
with pytest.raises(HTTPException) as exc_info:
|
||||
self.service.toggle_shop_status(db, 99999)
|
||||
|
||||
assert exc_info.value.status_code == 404
|
||||
assert "Shop not found" in str(exc_info.value.detail)
|
||||
|
||||
def test_get_marketplace_import_jobs_no_filters(self, db, test_marketplace_job):
|
||||
"""Test getting marketplace import jobs without filters using fixture"""
|
||||
result = self.service.get_marketplace_import_jobs(db, skip=0, limit=10)
|
||||
|
||||
assert len(result) >= 1
|
||||
# Find our test job in the results
|
||||
test_job = next((job for job in result if job.job_id == test_marketplace_job.id), None)
|
||||
assert test_job is not None
|
||||
assert test_job.marketplace == test_marketplace_job.marketplace
|
||||
assert test_job.shop_name == test_marketplace_job.shop_name
|
||||
assert test_job.status == test_marketplace_job.status
|
||||
|
||||
def test_get_marketplace_import_jobs_with_marketplace_filter(self, db, test_marketplace_job):
|
||||
"""Test getting marketplace import jobs filtered by marketplace"""
|
||||
# Create additional job with different marketplace
|
||||
other_job = MarketplaceImportJob(
|
||||
marketplace="ebay",
|
||||
shop_name="eBay Shop",
|
||||
status="completed",
|
||||
source_url="https://ebay.example.com/import"
|
||||
)
|
||||
db.add(other_job)
|
||||
db.commit()
|
||||
|
||||
# Filter by the test marketplace job's marketplace
|
||||
result = self.service.get_marketplace_import_jobs(db, marketplace=test_marketplace_job.marketplace)
|
||||
|
||||
assert len(result) >= 1
|
||||
# All results should match the marketplace filter
|
||||
for job in result:
|
||||
assert test_marketplace_job.marketplace.lower() in job.marketplace.lower()
|
||||
|
||||
def test_get_marketplace_import_jobs_with_shop_filter(self, db, test_marketplace_job):
|
||||
"""Test getting marketplace import jobs filtered by shop name"""
|
||||
# Create additional job with different shop name
|
||||
other_job = MarketplaceImportJob(
|
||||
marketplace="amazon",
|
||||
shop_name="Different Shop Name",
|
||||
status="completed",
|
||||
source_url="https://different.example.com/import"
|
||||
)
|
||||
db.add(other_job)
|
||||
db.commit()
|
||||
|
||||
# Filter by the test marketplace job's shop name
|
||||
result = self.service.get_marketplace_import_jobs(db, shop_name=test_marketplace_job.shop_name)
|
||||
|
||||
assert len(result) >= 1
|
||||
# All results should match the shop name filter
|
||||
for job in result:
|
||||
assert test_marketplace_job.shop_name.lower() in job.shop_name.lower()
|
||||
|
||||
def test_get_marketplace_import_jobs_with_status_filter(self, db, test_marketplace_job):
|
||||
"""Test getting marketplace import jobs filtered by status"""
|
||||
# Create additional job with different status
|
||||
other_job = MarketplaceImportJob(
|
||||
marketplace="amazon",
|
||||
shop_name="Test Shop",
|
||||
status="pending",
|
||||
source_url="https://pending.example.com/import"
|
||||
)
|
||||
db.add(other_job)
|
||||
db.commit()
|
||||
|
||||
# Filter by the test marketplace job's status
|
||||
result = self.service.get_marketplace_import_jobs(db, status=test_marketplace_job.status)
|
||||
|
||||
assert len(result) >= 1
|
||||
# All results should match the status filter
|
||||
for job in result:
|
||||
assert job.status == test_marketplace_job.status
|
||||
|
||||
def test_get_marketplace_import_jobs_with_multiple_filters(self, db, test_marketplace_job, test_shop):
|
||||
"""Test getting marketplace import jobs with multiple filters"""
|
||||
# Create jobs that don't match all filters
|
||||
non_matching_job1 = MarketplaceImportJob(
|
||||
marketplace="ebay", # Different marketplace
|
||||
shop_name=test_marketplace_job.shop_name,
|
||||
status=test_marketplace_job.status,
|
||||
source_url="https://non-matching1.example.com/import",
|
||||
shop_id=test_shop.id # Add required shop_id
|
||||
)
|
||||
non_matching_job2 = MarketplaceImportJob(
|
||||
marketplace=test_marketplace_job.marketplace,
|
||||
shop_name="Different Shop", # Different shop name
|
||||
status=test_marketplace_job.status,
|
||||
source_url="https://non-matching2.example.com/import",
|
||||
shop_id=test_shop.id # Add required shop_id
|
||||
)
|
||||
db.add_all([non_matching_job1, non_matching_job2])
|
||||
db.commit()
|
||||
|
||||
# Apply all three filters matching the test job
|
||||
result = self.service.get_marketplace_import_jobs(
|
||||
db,
|
||||
marketplace=test_marketplace_job.marketplace,
|
||||
shop_name=test_marketplace_job.shop_name,
|
||||
status=test_marketplace_job.status
|
||||
)
|
||||
|
||||
assert len(result) >= 1
|
||||
# Find our test job in the results
|
||||
test_job = next((job for job in result if job.job_id == test_marketplace_job.id), None)
|
||||
assert test_job is not None
|
||||
assert test_job.marketplace == test_marketplace_job.marketplace
|
||||
assert test_job.shop_name == test_marketplace_job.shop_name
|
||||
assert test_job.status == test_marketplace_job.status
|
||||
|
||||
def test_get_marketplace_import_jobs_null_values(self, db):
|
||||
"""Test that marketplace import jobs handle null values correctly"""
|
||||
# Create job with null values but required fields
|
||||
job = MarketplaceImportJob(
|
||||
marketplace="test",
|
||||
shop_name="Test Shop",
|
||||
status="pending",
|
||||
source_url="https://test.example.com/import",
|
||||
imported_count=None,
|
||||
updated_count=None,
|
||||
total_processed=None,
|
||||
error_count=None,
|
||||
error_message=None
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
|
||||
result = self.service.get_marketplace_import_jobs(db)
|
||||
|
||||
assert len(result) >= 1
|
||||
# Find the job with null values
|
||||
null_job = next((j for j in result if j.job_id == job.id), None)
|
||||
assert null_job is not None
|
||||
assert null_job.imported == 0 # None converted to 0
|
||||
assert null_job.updated == 0
|
||||
assert null_job.total_processed == 0
|
||||
assert null_job.error_count == 0
|
||||
assert null_job.error_message is None
|
||||
|
||||
def test_get_user_by_id(self, db, test_user):
|
||||
"""Test getting user by ID using fixture"""
|
||||
user = self.service.get_user_by_id(db, test_user.id)
|
||||
|
||||
assert user is not None
|
||||
assert user.id == test_user.id
|
||||
assert user.email == test_user.email
|
||||
assert user.username == test_user.username
|
||||
|
||||
def test_get_user_by_id_not_found(self, db):
|
||||
"""Test getting user by ID when user doesn't exist"""
|
||||
user = self.service.get_user_by_id(db, 99999)
|
||||
|
||||
assert user is None
|
||||
|
||||
def test_get_shop_by_id(self, db, test_shop):
|
||||
"""Test getting shop by ID using fixture"""
|
||||
shop = self.service.get_shop_by_id(db, test_shop.id)
|
||||
|
||||
assert shop is not None
|
||||
assert shop.id == test_shop.id
|
||||
assert shop.shop_code == test_shop.shop_code
|
||||
assert shop.shop_name == test_shop.shop_name
|
||||
|
||||
def test_get_shop_by_id_not_found(self, db):
|
||||
"""Test getting shop by ID when shop doesn't exist"""
|
||||
shop = self.service.get_shop_by_id(db, 99999)
|
||||
|
||||
assert shop is None
|
||||
|
||||
def test_user_exists_true(self, db, test_user):
|
||||
"""Test user_exists returns True when user exists"""
|
||||
exists = self.service.user_exists(db, test_user.id)
|
||||
|
||||
assert exists is True
|
||||
|
||||
def test_user_exists_false(self, db):
|
||||
"""Test user_exists returns False when user doesn't exist"""
|
||||
exists = self.service.user_exists(db, 99999)
|
||||
|
||||
assert exists is False
|
||||
|
||||
def test_shop_exists_true(self, db, test_shop):
|
||||
"""Test shop_exists returns True when shop exists"""
|
||||
exists = self.service.shop_exists(db, test_shop.id)
|
||||
|
||||
assert exists is True
|
||||
|
||||
def test_shop_exists_false(self, db):
|
||||
"""Test shop_exists returns False when shop doesn't exist"""
|
||||
exists = self.service.shop_exists(db, 99999)
|
||||
|
||||
assert exists is False
|
||||
@@ -23,7 +23,7 @@ class TestAuthenticationAPI:
|
||||
def test_register_user_duplicate_email(self, client, test_user):
|
||||
"""Test registration with duplicate email"""
|
||||
response = client.post("/api/v1/auth/register", json={
|
||||
"email": "test@example.com", # Same as test_user
|
||||
"email": test_user.email, # Same as test_user
|
||||
"username": "newuser",
|
||||
"password": "securepass123"
|
||||
})
|
||||
@@ -35,7 +35,7 @@ class TestAuthenticationAPI:
|
||||
"""Test registration with duplicate username"""
|
||||
response = client.post("/api/v1/auth/register", json={
|
||||
"email": "new@example.com",
|
||||
"username": "testuser", # Same as test_user
|
||||
"username": test_user.username, # Same as test_user
|
||||
"password": "securepass123"
|
||||
})
|
||||
|
||||
@@ -45,7 +45,7 @@ class TestAuthenticationAPI:
|
||||
def test_login_success(self, client, test_user):
|
||||
"""Test successful login"""
|
||||
response = client.post("/api/v1/auth/login", json={
|
||||
"username": "testuser",
|
||||
"username": test_user.username,
|
||||
"password": "testpass123"
|
||||
})
|
||||
|
||||
@@ -54,7 +54,7 @@ class TestAuthenticationAPI:
|
||||
assert "access_token" in data
|
||||
assert data["token_type"] == "bearer"
|
||||
assert "expires_in" in data
|
||||
assert data["user"]["username"] == "testuser"
|
||||
assert data["user"]["username"] == test_user.username
|
||||
|
||||
def test_login_wrong_password(self, client, test_user):
|
||||
"""Test login with wrong password"""
|
||||
@@ -75,14 +75,14 @@ class TestAuthenticationAPI:
|
||||
|
||||
assert response.status_code == 401
|
||||
|
||||
def test_get_current_user_info(self, client, auth_headers):
|
||||
def test_get_current_user_info(self, client, auth_headers, test_user):
|
||||
"""Test getting current user info"""
|
||||
response = client.get("/api/v1/auth/me", headers=auth_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["username"] == "testuser"
|
||||
assert data["email"] == "test@example.com"
|
||||
assert data["username"] == test_user.username
|
||||
assert data["email"] == test_user.email
|
||||
|
||||
def test_get_current_user_no_auth(self, client):
|
||||
"""Test getting current user without authentication"""
|
||||
|
||||
@@ -2,6 +2,8 @@
|
||||
import pytest
|
||||
import time
|
||||
|
||||
from models.database_models import Product
|
||||
|
||||
|
||||
class TestPerformance:
|
||||
def test_product_list_performance(self, client, auth_headers, db):
|
||||
@@ -22,7 +24,7 @@ class TestPerformance:
|
||||
|
||||
# Time the request
|
||||
start_time = time.time()
|
||||
response = client.get("/api/v1/products?limit=100", headers=auth_headers)
|
||||
response = client.get("/api/v1/product?limit=100", headers=auth_headers)
|
||||
end_time = time.time()
|
||||
|
||||
assert response.status_code == 200
|
||||
@@ -48,7 +50,7 @@ class TestPerformance:
|
||||
|
||||
# Time search request
|
||||
start_time = time.time()
|
||||
response = client.get("/api/v1/products?search=Searchable", headers=auth_headers)
|
||||
response = client.get("/api/v1/product?search=Searchable", headers=auth_headers)
|
||||
end_time = time.time()
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
# tests/test_products.py
|
||||
# tests/test_product.py
|
||||
import pytest
|
||||
|
||||
|
||||
class TestProductsAPI:
|
||||
def test_get_products_empty(self, client, auth_headers):
|
||||
"""Test getting products when none exist"""
|
||||
response = client.get("/api/v1/products", headers=auth_headers)
|
||||
response = client.get("/api/v1/product", headers=auth_headers)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
@@ -1,4 +1,4 @@
|
||||
# tests/test_shops.py
|
||||
# tests/test_shop.py
|
||||
import pytest
|
||||
|
||||
|
||||
Reference in New Issue
Block a user