Marketplace tests update

This commit is contained in:
2025-09-24 22:28:44 +02:00
parent f9879126c8
commit cea88a46c5
16 changed files with 613 additions and 73 deletions

View File

@@ -1,7 +1,8 @@
# tests/integration/api/v1/test_auth_endpoints.py
import pytest
from jose import jwt
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
@pytest.mark.integration
@pytest.mark.api
@@ -195,8 +196,8 @@ class TestAuthenticationAPI:
"username": test_user.username,
"email": test_user.email,
"role": test_user.role,
"exp": datetime.utcnow() - timedelta(hours=1),
"iat": datetime.utcnow() - timedelta(hours=2),
"exp": datetime.now(timezone.utc) - timedelta(hours=1),
"iat": datetime.now(timezone.utc) - timedelta(hours=2),
}
expired_token = jwt.encode(

View File

@@ -13,7 +13,7 @@ from models.database.product import Product
class TestExportFunctionality:
def test_csv_export_basic(self, client, auth_headers, test_product):
"""Test basic CSV export functionality"""
response = client.get("/api/v1/export-csv", headers=auth_headers)
response = client.get("/api/v1/product/export-csv", headers=auth_headers)
assert response.status_code == 200
assert response.headers["content-type"] == "text/csv; charset=utf-8"
@@ -40,7 +40,7 @@ class TestExportFunctionality:
db.commit()
response = client.get(
"/api/v1/export-csv?marketplace=Amazon", headers=auth_headers
"/api/v1/product/export-csv?marketplace=Amazon", headers=auth_headers
)
assert response.status_code == 200
@@ -66,7 +66,7 @@ class TestExportFunctionality:
import time
start_time = time.time()
response = client.get("/api/v1/export-csv", headers=auth_headers)
response = client.get("/api/v1/product/export-csv", headers=auth_headers)
end_time = time.time()
assert response.status_code == 200

View File

@@ -4,9 +4,14 @@ from unittest.mock import AsyncMock, patch
import pytest
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.marketplace
class TestMarketplaceAPI:
def test_import_from_marketplace(self, client, auth_headers, test_shop):
def test_import_from_marketplace(self, client, auth_headers, test_shop, test_user):
"""Test marketplace import endpoint - just test job creation"""
# Ensure user owns the shop
test_shop.owner_id = test_user.id
import_data = {
"url": "https://example.com/products.csv",
@@ -23,8 +28,8 @@ class TestMarketplaceAPI:
assert data["status"] == "pending"
assert data["marketplace"] == "TestMarket"
assert "job_id" in data
# Don't test the background task here - test it separately
assert data["shop_code"] == test_shop.shop_code
assert data["shop_id"] == test_shop.id
def test_import_from_marketplace_invalid_shop(self, client, auth_headers):
"""Test marketplace import with invalid shop"""
@@ -39,16 +44,383 @@ class TestMarketplaceAPI:
)
assert response.status_code == 404
assert "Shop not found" in response.json()["detail"]
data = response.json()
assert data["error_code"] == "SHOP_NOT_FOUND"
assert "NONEXISTENT" in data["message"]
def test_get_marketplace_import_jobs(self, client, auth_headers):
def test_import_from_marketplace_unauthorized_shop(self, client, auth_headers, test_shop, other_user):
"""Test marketplace import with unauthorized shop access"""
# Set shop owner to different user
test_shop.owner_id = other_user.id
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code,
}
response = client.post(
"/api/v1/marketplace/import-product", headers=auth_headers, json=import_data
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "UNAUTHORIZED_SHOP_ACCESS"
assert test_shop.shop_code in data["message"]
def test_import_from_marketplace_validation_error(self, client, auth_headers):
"""Test marketplace import with invalid request data"""
import_data = {
"url": "", # Empty URL
"marketplace": "", # Empty marketplace
# Missing shop_code
}
response = client.post(
"/api/v1/marketplace/import-product", headers=auth_headers, json=import_data
)
assert response.status_code == 422
data = response.json()
assert data["error_code"] == "VALIDATION_ERROR"
assert "Request validation failed" in data["message"]
def test_import_from_marketplace_admin_access(self, client, admin_headers, test_shop):
"""Test that admin can import for any shop"""
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "AdminMarket",
"shop_code": test_shop.shop_code,
}
response = client.post(
"/api/v1/marketplace/import-product", headers=admin_headers, json=import_data
)
assert response.status_code == 200
data = response.json()
assert data["marketplace"] == "AdminMarket"
assert data["shop_code"] == test_shop.shop_code
def test_get_marketplace_import_status(self, client, auth_headers, test_marketplace_job):
"""Test getting marketplace import status"""
response = client.get(
f"/api/v1/marketplace/import-status/{test_marketplace_job.id}",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["job_id"] == test_marketplace_job.id
assert data["status"] == test_marketplace_job.status
assert data["marketplace"] == test_marketplace_job.marketplace
def test_get_marketplace_import_status_not_found(self, client, auth_headers):
"""Test getting status of non-existent import job"""
response = client.get(
"/api/v1/marketplace/import-status/99999",
headers=auth_headers
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "IMPORT_JOB_NOT_FOUND"
assert "99999" in data["message"]
def test_get_marketplace_import_status_unauthorized(self, client, auth_headers, test_marketplace_job, other_user):
"""Test getting status of unauthorized import job"""
# Change job owner to other user
test_marketplace_job.user_id = other_user.id
response = client.get(
f"/api/v1/marketplace/import-status/{test_marketplace_job.id}",
headers=auth_headers
)
assert response.status_code == 403
data = response.json()
assert data["error_code"] == "IMPORT_JOB_NOT_OWNED"
def test_get_marketplace_import_jobs(self, client, auth_headers, test_marketplace_job):
"""Test getting marketplace import jobs"""
response = client.get("/api/v1/marketplace/import-jobs", headers=auth_headers)
assert response.status_code == 200
assert isinstance(response.json(), list)
data = response.json()
assert isinstance(data, list)
assert len(data) >= 1
# Find our test job in the results
job_ids = [job["job_id"] for job in data]
assert test_marketplace_job.id in job_ids
def test_get_marketplace_import_jobs_with_filters(self, client, auth_headers, test_marketplace_job):
"""Test getting import jobs with filters"""
response = client.get(
f"/api/v1/marketplace/import-jobs?marketplace={test_marketplace_job.marketplace}",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) >= 1
for job in data:
assert test_marketplace_job.marketplace.lower() in job["marketplace"].lower()
def test_get_marketplace_import_jobs_pagination(self, client, auth_headers):
"""Test import jobs pagination"""
response = client.get(
"/api/v1/marketplace/import-jobs?skip=0&limit=5",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
assert len(data) <= 5
def test_get_marketplace_import_stats(self, client, auth_headers, test_marketplace_job):
"""Test getting marketplace import statistics"""
response = client.get("/api/v1/marketplace/marketplace-import-stats", headers=auth_headers)
assert response.status_code == 200
data = response.json()
assert "total_jobs" in data
assert "pending_jobs" in data
assert "running_jobs" in data
assert "completed_jobs" in data
assert "failed_jobs" in data
assert isinstance(data["total_jobs"], int)
assert data["total_jobs"] >= 1
def test_cancel_marketplace_import_job(self, client, auth_headers, test_user, test_shop, db):
"""Test cancelling a marketplace import job"""
# Create a pending job that can be cancelled
from models.database.marketplace import MarketplaceImportJob
import uuid
unique_id = str(uuid.uuid4())[:8]
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0,
)
db.add(job)
db.commit()
db.refresh(job)
response = client.put(
f"/api/v1/marketplace/import-jobs/{job.id}/cancel",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert data["job_id"] == job.id
assert data["status"] == "cancelled"
assert data["completed_at"] is not None
def test_cancel_marketplace_import_job_not_found(self, client, auth_headers):
"""Test cancelling non-existent import job"""
response = client.put(
"/api/v1/marketplace/import-jobs/99999/cancel",
headers=auth_headers
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "IMPORT_JOB_NOT_FOUND"
def test_cancel_marketplace_import_job_cannot_cancel(self, client, auth_headers, test_marketplace_job, db):
"""Test cancelling a job that cannot be cancelled"""
# Set job to completed status
test_marketplace_job.status = "completed"
db.commit()
response = client.put(
f"/api/v1/marketplace/import-jobs/{test_marketplace_job.id}/cancel",
headers=auth_headers
)
assert response.status_code == 400
data = response.json()
assert data["error_code"] == "IMPORT_JOB_CANNOT_BE_CANCELLED"
assert "completed" in data["message"]
def test_delete_marketplace_import_job(self, client, auth_headers, test_user, test_shop, db):
"""Test deleting a marketplace import job"""
# Create a completed job that can be deleted
from models.database.marketplace import MarketplaceImportJob
import uuid
unique_id = str(uuid.uuid4())[:8]
job = MarketplaceImportJob(
status="completed",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0,
)
db.add(job)
db.commit()
db.refresh(job)
response = client.delete(
f"/api/v1/marketplace/import-jobs/{job.id}",
headers=auth_headers
)
assert response.status_code == 200
data = response.json()
assert "deleted successfully" in data["message"]
def test_delete_marketplace_import_job_not_found(self, client, auth_headers):
"""Test deleting non-existent import job"""
response = client.delete(
"/api/v1/marketplace/import-jobs/99999",
headers=auth_headers
)
assert response.status_code == 404
data = response.json()
assert data["error_code"] == "IMPORT_JOB_NOT_FOUND"
def test_delete_marketplace_import_job_cannot_delete(self, client, auth_headers, test_user, test_shop, db):
"""Test deleting a job that cannot be deleted"""
# Create a pending job that cannot be deleted
from models.database.marketplace import MarketplaceImportJob
import uuid
unique_id = str(uuid.uuid4())[:8]
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
user_id=test_user.id,
shop_id=test_shop.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0,
)
db.add(job)
db.commit()
db.refresh(job)
response = client.delete(
f"/api/v1/marketplace/import-jobs/{job.id}",
headers=auth_headers
)
assert response.status_code == 400
data = response.json()
assert data["error_code"] == "IMPORT_JOB_CANNOT_BE_DELETED"
assert "pending" in data["message"]
def test_get_marketplace_without_auth(self, client):
"""Test that marketplace endpoints require authentication"""
response = client.get("/api/v1/marketplace/import-jobs")
assert response.status_code == 401 # No authorization header
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
def test_import_without_auth(self, client):
"""Test marketplace import without authentication"""
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": "TEST_SHOP",
}
response = client.post("/api/v1/marketplace/import-product", json=import_data)
assert response.status_code == 401
data = response.json()
assert data["error_code"] == "INVALID_TOKEN"
def test_admin_can_access_all_jobs(self, client, admin_headers, test_marketplace_job):
"""Test that admin can access all import jobs"""
response = client.get("/api/v1/marketplace/import-jobs", headers=admin_headers)
assert response.status_code == 200
data = response.json()
assert isinstance(data, list)
# Admin should see all jobs, including the test job
job_ids = [job["job_id"] for job in data]
assert test_marketplace_job.id in job_ids
def test_admin_can_view_any_job_status(self, client, admin_headers, test_marketplace_job):
"""Test that admin can view any job status"""
response = client.get(
f"/api/v1/marketplace/import-status/{test_marketplace_job.id}",
headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert data["job_id"] == test_marketplace_job.id
def test_admin_can_cancel_any_job(self, client, admin_headers, test_user, test_shop, db):
"""Test that admin can cancel any job"""
# Create a pending job owned by different user
from models.database.marketplace import MarketplaceImportJob
import uuid
unique_id = str(uuid.uuid4())[:8]
job = MarketplaceImportJob(
status="pending",
marketplace="TestMarket",
shop_name=f"Test_Shop_{unique_id}",
user_id=test_user.id, # Different user
shop_id=test_shop.id,
source_url="https://test.example.com/import",
imported_count=0,
updated_count=0,
total_processed=0,
error_count=0,
)
db.add(job)
db.commit()
db.refresh(job)
response = client.put(
f"/api/v1/marketplace/import-jobs/{job.id}/cancel",
headers=admin_headers
)
assert response.status_code == 200
data = response.json()
assert data["status"] == "cancelled"
def test_rate_limiting_applied(self, client, auth_headers, test_shop, test_user):
"""Test that rate limiting is applied to import endpoint"""
# This test verifies that the rate_limit decorator is present
# Actual rate limiting testing would require multiple requests
test_shop.owner_id = test_user.id
import_data = {
"url": "https://example.com/products.csv",
"marketplace": "TestMarket",
"shop_code": test_shop.shop_code,
}
response = client.post(
"/api/v1/marketplace/import-product", headers=auth_headers, json=import_data
)
# Should succeed on first request
assert response.status_code == 200