test: add integration tests for vendor orders API and background tasks
Add comprehensive integration tests for: - Vendor orders API (list, detail, status updates, pagination, filtering) - Letzshop historical import background task (success, errors, progress) - Subscription background tasks (period reset, trial expiration, Stripe sync) These 39 new tests improve coverage for background task functionality and vendor order management. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
341
tests/integration/tasks/test_letzshop_tasks.py
Normal file
341
tests/integration/tasks/test_letzshop_tasks.py
Normal file
@@ -0,0 +1,341 @@
|
||||
# tests/integration/tasks/test_letzshop_tasks.py
|
||||
"""Integration tests for Letzshop background tasks."""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from app.services.letzshop import LetzshopClientError
|
||||
from app.tasks.letzshop_tasks import process_historical_import
|
||||
from models.database.letzshop import LetzshopHistoricalImportJob
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def historical_import_job(db, test_vendor, test_user):
|
||||
"""Create a test historical import job."""
|
||||
job = LetzshopHistoricalImportJob(
|
||||
vendor_id=test_vendor.id,
|
||||
user_id=test_user.id,
|
||||
status="pending",
|
||||
)
|
||||
db.add(job)
|
||||
db.commit()
|
||||
db.refresh(job)
|
||||
return job
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.database
|
||||
@pytest.mark.letzshop
|
||||
class TestHistoricalImportTask:
|
||||
"""Test historical import background task."""
|
||||
|
||||
def test_job_not_found(self, db, test_vendor):
|
||||
"""Test handling when job doesn't exist."""
|
||||
with patch("app.tasks.letzshop_tasks.SessionLocal", return_value=db):
|
||||
# Should not raise an exception
|
||||
process_historical_import(job_id=99999, vendor_id=test_vendor.id)
|
||||
|
||||
def test_successful_import(self, db, test_vendor, historical_import_job):
|
||||
"""Test successful historical import with both phases."""
|
||||
job_id = historical_import_job.id
|
||||
|
||||
# Mock the services
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.get_all_shipments_paginated.side_effect = [
|
||||
# First call: confirmed shipments
|
||||
[{"id": "1", "state": "confirmed"}, {"id": "2", "state": "confirmed"}],
|
||||
# Second call: unconfirmed shipments
|
||||
[{"id": "3", "state": "unconfirmed"}],
|
||||
]
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.return_value = mock_client
|
||||
mock_creds_service.update_sync_status = MagicMock()
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.import_historical_shipments.side_effect = [
|
||||
# First call: confirmed stats
|
||||
{
|
||||
"total": 2,
|
||||
"imported": 2,
|
||||
"updated": 0,
|
||||
"skipped": 0,
|
||||
"products_matched": 5,
|
||||
"products_not_found": 1,
|
||||
},
|
||||
# Second call: unconfirmed stats
|
||||
{
|
||||
"total": 1,
|
||||
"imported": 1,
|
||||
"updated": 0,
|
||||
"skipped": 0,
|
||||
"products_matched": 2,
|
||||
"products_not_found": 0,
|
||||
},
|
||||
]
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=db),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
):
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify job was updated
|
||||
updated_job = (
|
||||
db.query(LetzshopHistoricalImportJob)
|
||||
.filter(LetzshopHistoricalImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
assert updated_job.status == "completed"
|
||||
assert updated_job.started_at is not None
|
||||
assert updated_job.completed_at is not None
|
||||
assert updated_job.confirmed_stats["imported"] == 2
|
||||
assert updated_job.declined_stats["imported"] == 1
|
||||
assert updated_job.products_matched == 7
|
||||
assert updated_job.products_not_found == 1
|
||||
|
||||
# Verify sync status was updated
|
||||
mock_creds_service.update_sync_status.assert_called_with(
|
||||
test_vendor.id, "success", None
|
||||
)
|
||||
|
||||
def test_letzshop_client_error(self, db, test_vendor, historical_import_job):
|
||||
"""Test handling Letzshop API errors."""
|
||||
job_id = historical_import_job.id
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.side_effect = LetzshopClientError(
|
||||
"API connection failed"
|
||||
)
|
||||
mock_creds_service.update_sync_status = MagicMock()
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.get_vendor.return_value = test_vendor
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=db),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks.admin_notification_service"
|
||||
) as mock_notify,
|
||||
):
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify job failed
|
||||
updated_job = (
|
||||
db.query(LetzshopHistoricalImportJob)
|
||||
.filter(LetzshopHistoricalImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
assert updated_job.status == "failed"
|
||||
assert "API connection failed" in updated_job.error_message
|
||||
assert updated_job.completed_at is not None
|
||||
|
||||
# Verify sync status was updated to failed
|
||||
mock_creds_service.update_sync_status.assert_called_with(
|
||||
test_vendor.id, "failed", "API connection failed"
|
||||
)
|
||||
|
||||
def test_unexpected_error(self, db, test_vendor, historical_import_job):
|
||||
"""Test handling unexpected errors."""
|
||||
job_id = historical_import_job.id
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.side_effect = RuntimeError("Unexpected error")
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.get_vendor.return_value = test_vendor
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=db),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks.admin_notification_service"
|
||||
) as mock_notify,
|
||||
):
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify job failed
|
||||
updated_job = (
|
||||
db.query(LetzshopHistoricalImportJob)
|
||||
.filter(LetzshopHistoricalImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
assert updated_job.status == "failed"
|
||||
assert "Unexpected error" in updated_job.error_message
|
||||
|
||||
# Verify critical error notification was sent
|
||||
mock_notify.notify_critical_error.assert_called_once()
|
||||
|
||||
def test_progress_tracking(self, db, test_vendor, historical_import_job):
|
||||
"""Test that progress is tracked correctly during import."""
|
||||
job_id = historical_import_job.id
|
||||
progress_updates = []
|
||||
|
||||
# Create a mock client that tracks progress calls
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
|
||||
def track_fetch_progress(*args, **kwargs):
|
||||
# Simulate fetching shipments and call progress callback
|
||||
progress_callback = kwargs.get("progress_callback")
|
||||
if progress_callback:
|
||||
progress_callback(1, 10)
|
||||
progress_callback(2, 20)
|
||||
return [{"id": str(i)} for i in range(20)]
|
||||
|
||||
mock_client.get_all_shipments_paginated.side_effect = [
|
||||
track_fetch_progress(state="confirmed", page_size=50, progress_callback=None),
|
||||
[], # Empty unconfirmed
|
||||
]
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.return_value = mock_client
|
||||
mock_creds_service.update_sync_status = MagicMock()
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.import_historical_shipments.return_value = {
|
||||
"total": 20,
|
||||
"imported": 18,
|
||||
"updated": 2,
|
||||
"skipped": 0,
|
||||
"products_matched": 50,
|
||||
"products_not_found": 5,
|
||||
}
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=db),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
):
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify final job state
|
||||
updated_job = (
|
||||
db.query(LetzshopHistoricalImportJob)
|
||||
.filter(LetzshopHistoricalImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
assert updated_job.status == "completed"
|
||||
|
||||
def test_commit_error_in_exception_handler(
|
||||
self, db, test_vendor, historical_import_job
|
||||
):
|
||||
"""Test handling when commit fails during exception handling."""
|
||||
job_id = historical_import_job.id
|
||||
|
||||
# Create a mock session that fails on the second commit
|
||||
mock_session = MagicMock()
|
||||
mock_session.query.return_value.filter.return_value.first.return_value = (
|
||||
historical_import_job
|
||||
)
|
||||
mock_session.commit.side_effect = [
|
||||
None, # First commit (status update) succeeds
|
||||
Exception("Commit failed"), # Second commit fails
|
||||
]
|
||||
mock_session.rollback = MagicMock()
|
||||
mock_session.close = MagicMock()
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.side_effect = RuntimeError("Test error")
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.get_vendor.return_value = test_vendor
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=mock_session),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
patch("app.tasks.letzshop_tasks.admin_notification_service"),
|
||||
):
|
||||
# Should not raise
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify rollback was called
|
||||
mock_session.rollback.assert_called()
|
||||
|
||||
def test_close_error_handling(self, db, test_vendor, historical_import_job):
|
||||
"""Test handling when session close fails."""
|
||||
job_id = historical_import_job.id
|
||||
|
||||
# Create a mock session that fails on close
|
||||
mock_session = MagicMock()
|
||||
mock_session.query.return_value.filter.return_value.first.return_value = (
|
||||
historical_import_job
|
||||
)
|
||||
mock_session.commit = MagicMock()
|
||||
mock_session.close.side_effect = Exception("Close failed")
|
||||
|
||||
mock_client = MagicMock()
|
||||
mock_client.__enter__ = MagicMock(return_value=mock_client)
|
||||
mock_client.__exit__ = MagicMock(return_value=False)
|
||||
mock_client.get_all_shipments_paginated.return_value = []
|
||||
|
||||
mock_creds_service = MagicMock()
|
||||
mock_creds_service.create_client.return_value = mock_client
|
||||
mock_creds_service.update_sync_status = MagicMock()
|
||||
|
||||
mock_order_service = MagicMock()
|
||||
mock_order_service.import_historical_shipments.return_value = {
|
||||
"total": 0,
|
||||
"imported": 0,
|
||||
"updated": 0,
|
||||
"skipped": 0,
|
||||
"products_matched": 0,
|
||||
"products_not_found": 0,
|
||||
}
|
||||
|
||||
with (
|
||||
patch("app.tasks.letzshop_tasks.SessionLocal", return_value=mock_session),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_credentials_service",
|
||||
return_value=mock_creds_service,
|
||||
),
|
||||
patch(
|
||||
"app.tasks.letzshop_tasks._get_order_service",
|
||||
return_value=mock_order_service,
|
||||
),
|
||||
):
|
||||
# Should not raise
|
||||
process_historical_import(job_id=job_id, vendor_id=test_vendor.id)
|
||||
|
||||
# Verify close was attempted
|
||||
mock_session.close.assert_called()
|
||||
Reference in New Issue
Block a user