287 lines
11 KiB
Python
287 lines
11 KiB
Python
# tests/test_marketplace_service.py
|
|
import pytest
|
|
import uuid
|
|
from app.services.marketplace_service import MarketplaceService
|
|
from models.api_models import MarketplaceImportRequest
|
|
from models.database_models import MarketplaceImportJob, Shop, User
|
|
from datetime import datetime
|
|
|
|
|
|
class TestMarketplaceService:
|
|
def setup_method(self):
|
|
self.service = MarketplaceService()
|
|
|
|
def test_validate_shop_access_success(self, db, test_shop, test_user):
|
|
"""Test successful shop access validation"""
|
|
# Set the shop owner to the test user
|
|
test_shop.owner_id = test_user.id
|
|
db.commit()
|
|
|
|
result = self.service.validate_shop_access(db, test_shop.shop_code, test_user)
|
|
|
|
assert result.shop_code == test_shop.shop_code
|
|
assert result.owner_id == test_user.id
|
|
|
|
def test_validate_shop_access_admin_can_access_any_shop(self, db, test_shop, test_admin):
|
|
"""Test that admin users can access any shop"""
|
|
result = self.service.validate_shop_access(db, test_shop.shop_code, test_admin)
|
|
|
|
assert result.shop_code == test_shop.shop_code
|
|
|
|
def test_validate_shop_access_shop_not_found(self, db, test_user):
|
|
"""Test shop access validation when shop doesn't exist"""
|
|
with pytest.raises(ValueError, match="Shop not found"):
|
|
self.service.validate_shop_access(db, "NONEXISTENT", test_user)
|
|
|
|
def test_validate_shop_access_permission_denied(self, db, test_shop, test_user, other_user):
|
|
"""Test shop access validation when user doesn't own the shop"""
|
|
# Set the shop owner to a different user
|
|
test_shop.owner_id = other_user.id
|
|
db.commit()
|
|
|
|
with pytest.raises(PermissionError, match="Access denied to this shop"):
|
|
self.service.validate_shop_access(db, test_shop.shop_code, test_user)
|
|
|
|
def test_create_import_job_success(self, db, test_shop, test_user):
|
|
"""Test successful creation of import job"""
|
|
# Set the shop owner to the test user
|
|
test_shop.owner_id = test_user.id
|
|
db.commit()
|
|
|
|
request = MarketplaceImportRequest(
|
|
url="https://example.com/products.csv",
|
|
marketplace="Amazon",
|
|
shop_code=test_shop.shop_code,
|
|
batch_size=1000
|
|
)
|
|
|
|
result = self.service.create_import_job(db, request, test_user)
|
|
|
|
assert result.marketplace == "Amazon"
|
|
# Check the correct field based on your model
|
|
assert result.shop_id == test_shop.id # Changed from shop_code to shop_id
|
|
assert result.user_id == test_user.id if hasattr(result, 'user_id') else True
|
|
assert result.status == "pending"
|
|
assert result.source_url == "https://example.com/products.csv"
|
|
|
|
def test_create_import_job_invalid_shop(self, db, test_user):
|
|
"""Test import job creation with invalid shop"""
|
|
request = MarketplaceImportRequest(
|
|
url="https://example.com/products.csv",
|
|
marketplace="Amazon",
|
|
shop_code="INVALID_SHOP",
|
|
batch_size=1000
|
|
)
|
|
|
|
with pytest.raises(ValueError, match="Shop not found"):
|
|
self.service.create_import_job(db, request, test_user)
|
|
|
|
def test_get_import_job_by_id_success(self, db, test_marketplace_job, test_user):
|
|
"""Test getting import job by ID for job owner"""
|
|
result = self.service.get_import_job_by_id(db, test_marketplace_job.id, test_user)
|
|
|
|
assert result.id == test_marketplace_job.id
|
|
# Check user_id if the field exists
|
|
if hasattr(result, 'user_id'):
|
|
assert result.user_id == test_user.id
|
|
|
|
def test_get_import_job_by_id_admin_access(self, db, test_marketplace_job, test_admin):
|
|
"""Test that admin can access any import job"""
|
|
result = self.service.get_import_job_by_id(db, test_marketplace_job.id, test_admin)
|
|
|
|
assert result.id == test_marketplace_job.id
|
|
|
|
def test_get_import_job_by_id_not_found(self, db, test_user):
|
|
"""Test getting non-existent import job"""
|
|
with pytest.raises(ValueError, match="Marketplace import job not found"):
|
|
self.service.get_import_job_by_id(db, 99999, test_user)
|
|
|
|
def test_get_import_job_by_id_access_denied(self, db, test_marketplace_job, other_user):
|
|
"""Test access denied when user doesn't own the job"""
|
|
with pytest.raises(PermissionError, match="Access denied to this import job"):
|
|
self.service.get_import_job_by_id(db, test_marketplace_job.id, other_user)
|
|
|
|
def test_get_import_jobs_user_filter(self, db, test_marketplace_job, test_user):
|
|
"""Test getting import jobs filtered by user"""
|
|
jobs = self.service.get_import_jobs(db, test_user)
|
|
|
|
assert len(jobs) >= 1
|
|
assert any(job.id == test_marketplace_job.id for job in jobs)
|
|
# Check user_id if the field exists
|
|
if hasattr(test_marketplace_job, 'user_id'):
|
|
assert test_marketplace_job.user_id == test_user.id
|
|
|
|
def test_get_import_jobs_admin_sees_all(self, db, test_marketplace_job, test_admin):
|
|
"""Test that admin sees all import jobs"""
|
|
jobs = self.service.get_import_jobs(db, test_admin)
|
|
|
|
assert len(jobs) >= 1
|
|
assert any(job.id == test_marketplace_job.id for job in jobs)
|
|
|
|
def test_get_import_jobs_with_marketplace_filter(self, db, test_marketplace_job, test_user):
|
|
"""Test getting import jobs with marketplace filter"""
|
|
jobs = self.service.get_import_jobs(
|
|
db, test_user, marketplace=test_marketplace_job.marketplace
|
|
)
|
|
|
|
assert len(jobs) >= 1
|
|
assert any(job.marketplace == test_marketplace_job.marketplace for job in jobs)
|
|
|
|
def test_get_import_jobs_with_pagination(self, db, test_user, test_shop):
|
|
"""Test getting import jobs with pagination"""
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create multiple import jobs
|
|
for i in range(5):
|
|
job = MarketplaceImportJob(
|
|
status="completed",
|
|
marketplace=f"Marketplace_{unique_id}_{i}",
|
|
shop_name=f"Test_Shop_{unique_id}_{i}",
|
|
user_id=test_user.id,
|
|
shop_id=test_shop.id, # Use shop_id instead of shop_code
|
|
source_url=f"https://test-{i}.example.com/import",
|
|
imported_count=0,
|
|
updated_count=0,
|
|
total_processed=0,
|
|
error_count=0
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
|
|
jobs = self.service.get_import_jobs(db, test_user, skip=2, limit=2)
|
|
|
|
assert len(jobs) <= 2 # Should be at most 2
|
|
|
|
def test_update_job_status_success(self, db, test_marketplace_job):
|
|
"""Test updating job status"""
|
|
result = self.service.update_job_status(
|
|
db,
|
|
test_marketplace_job.id,
|
|
"completed",
|
|
imported_count=100,
|
|
total_processed=100
|
|
)
|
|
|
|
assert result.status == "completed"
|
|
assert result.imported_count == 100
|
|
assert result.total_processed == 100
|
|
|
|
def test_update_job_status_not_found(self, db):
|
|
"""Test updating non-existent job status"""
|
|
with pytest.raises(ValueError, match="Marketplace import job not found"):
|
|
self.service.update_job_status(db, 99999, "completed")
|
|
|
|
def test_get_job_stats_user(self, db, test_marketplace_job, test_user):
|
|
"""Test getting job statistics for user"""
|
|
stats = self.service.get_job_stats(db, test_user)
|
|
|
|
assert stats["total_jobs"] >= 1
|
|
assert "pending_jobs" in stats
|
|
assert "running_jobs" in stats
|
|
assert "completed_jobs" in stats
|
|
assert "failed_jobs" in stats
|
|
|
|
def test_get_job_stats_admin(self, db, test_marketplace_job, test_admin):
|
|
"""Test getting job statistics for admin"""
|
|
stats = self.service.get_job_stats(db, test_admin)
|
|
|
|
assert stats["total_jobs"] >= 1
|
|
|
|
def test_convert_to_response_model(self, test_marketplace_job):
|
|
"""Test converting database model to response model"""
|
|
response = self.service.convert_to_response_model(test_marketplace_job)
|
|
|
|
assert response.job_id == test_marketplace_job.id
|
|
assert response.status == test_marketplace_job.status
|
|
assert response.marketplace == test_marketplace_job.marketplace
|
|
assert response.imported == (test_marketplace_job.imported_count or 0)
|
|
|
|
def test_cancel_import_job_success(self, db, test_user, test_shop):
|
|
"""Test cancelling a pending import job"""
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create a pending job
|
|
job = MarketplaceImportJob(
|
|
status="pending",
|
|
marketplace="Amazon",
|
|
shop_name=f"TEST_SHOP_{unique_id}",
|
|
user_id=test_user.id,
|
|
shop_id=test_shop.id, # Use shop_id instead of shop_code
|
|
source_url="https://test.example.com/import",
|
|
imported_count=0,
|
|
updated_count=0,
|
|
total_processed=0,
|
|
error_count=0
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
db.refresh(job)
|
|
|
|
result = self.service.cancel_import_job(db, job.id, test_user)
|
|
|
|
assert result.status == "cancelled"
|
|
assert result.completed_at is not None
|
|
|
|
def test_cancel_import_job_invalid_status(self, db, test_marketplace_job, test_user):
|
|
"""Test cancelling a job that can't be cancelled"""
|
|
# Set job status to completed
|
|
test_marketplace_job.status = "completed"
|
|
db.commit()
|
|
|
|
with pytest.raises(ValueError, match="Cannot cancel job with status: completed"):
|
|
self.service.cancel_import_job(db, test_marketplace_job.id, test_user)
|
|
|
|
def test_delete_import_job_success(self, db, test_user, test_shop):
|
|
"""Test deleting a completed import job"""
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create a completed job
|
|
job = MarketplaceImportJob(
|
|
status="completed",
|
|
marketplace="Amazon",
|
|
shop_name=f"TEST_SHOP_{unique_id}",
|
|
user_id=test_user.id,
|
|
shop_id=test_shop.id, # Use shop_id instead of shop_code
|
|
source_url="https://test.example.com/import",
|
|
imported_count=0,
|
|
updated_count=0,
|
|
total_processed=0,
|
|
error_count=0
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
db.refresh(job)
|
|
job_id = job.id
|
|
|
|
result = self.service.delete_import_job(db, job_id, test_user)
|
|
|
|
assert result is True
|
|
|
|
# Verify the job is actually deleted
|
|
deleted_job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
|
|
assert deleted_job is None
|
|
|
|
def test_delete_import_job_invalid_status(self, db, test_user, test_shop):
|
|
"""Test deleting a job that can't be deleted"""
|
|
unique_id = str(uuid.uuid4())[:8]
|
|
|
|
# Create a pending job
|
|
job = MarketplaceImportJob(
|
|
status="pending",
|
|
marketplace="Amazon",
|
|
shop_name=f"TEST_SHOP_{unique_id}",
|
|
user_id=test_user.id,
|
|
shop_id=test_shop.id, # Use shop_id instead of shop_code
|
|
source_url="https://test.example.com/import",
|
|
imported_count=0,
|
|
updated_count=0,
|
|
total_processed=0,
|
|
error_count=0
|
|
)
|
|
db.add(job)
|
|
db.commit()
|
|
db.refresh(job)
|
|
|
|
with pytest.raises(ValueError, match="Cannot delete job with status: pending"):
|
|
self.service.delete_import_job(db, job.id, test_user)
|