refactor: fix architecture violations in admin marketplace endpoint

Move database queries to service layer and use proper Pydantic responses:

Schema changes (models/schema/marketplace_import_job.py):
- Add AdminMarketplaceImportJobResponse with extra fields (id, error_details,
  created_by_name)
- Add AdminMarketplaceImportJobListResponse with items/total/page/limit format

Service changes (app/services/marketplace_import_job_service.py):
- Add convert_to_admin_response_model() for admin-specific response
- Add get_all_import_jobs_paginated() for admin listing
- Add get_import_job_by_id_admin() without access control

API changes (app/api/v1/admin/marketplace.py):
- Use service methods instead of direct DB queries
- Use proper Pydantic response models instead of dicts
- All business logic now in service layer

Cleanup:
- Remove duplicate methods from admin_service.py

Architecture validation now passes with 0 violations.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-05 22:14:25 +01:00
parent 1fcbaa922e
commit 5d40551d98
3 changed files with 114 additions and 68 deletions

View File

@@ -10,15 +10,15 @@ from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.exceptions import ImportJobNotFoundException, VendorNotFoundException
from app.services.marketplace_import_job_service import marketplace_import_job_service
from app.services.stats_service import stats_service
from app.services.vendor_service import vendor_service
from app.tasks.background_tasks import process_marketplace_import
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.user import User
from models.database.vendor import Vendor
from models.schema.marketplace_import_job import (
AdminMarketplaceImportJobListResponse,
AdminMarketplaceImportJobRequest,
AdminMarketplaceImportJobResponse,
MarketplaceImportJobRequest,
MarketplaceImportJobResponse,
)
@@ -27,31 +27,7 @@ router = APIRouter(prefix="/marketplace-import-jobs")
logger = logging.getLogger(__name__)
def _job_to_dict(job: MarketplaceImportJob) -> dict:
"""Convert a MarketplaceImportJob to a response dict with frontend-compatible fields."""
return {
"id": job.id,
"job_id": job.id,
"status": job.status,
"marketplace": job.marketplace,
"source_url": job.source_url,
"vendor_id": job.vendor.id if job.vendor else None,
"vendor_code": job.vendor.vendor_code if job.vendor else None,
"vendor_name": job.vendor.name if job.vendor else None,
"imported": job.imported_count or 0,
"updated": job.updated_count or 0,
"total_processed": job.total_processed or 0,
"error_count": job.error_count or 0,
"error_message": job.error_message,
"error_details": [], # Placeholder for future use
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"created_by_name": job.user.username if job.user else None,
}
@router.get("")
@router.get("", response_model=AdminMarketplaceImportJobListResponse)
def get_all_marketplace_import_jobs(
marketplace: str | None = Query(None),
status: str | None = Query(None),
@@ -61,30 +37,23 @@ def get_all_marketplace_import_jobs(
current_admin: User = Depends(get_current_admin_api),
):
"""Get all marketplace import jobs with pagination (Admin only)."""
query = db.query(MarketplaceImportJob)
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if status:
query = query.filter(MarketplaceImportJob.status == status)
total = query.count()
skip = (page - 1) * limit
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
jobs, total = marketplace_import_job_service.get_all_import_jobs_paginated(
db=db,
marketplace=marketplace,
status=status,
page=page,
limit=limit,
)
return {
"items": [_job_to_dict(job) for job in jobs],
"total": total,
"page": page,
"limit": limit,
}
return AdminMarketplaceImportJobListResponse(
items=[
marketplace_import_job_service.convert_to_admin_response_model(job)
for job in jobs
],
total=total,
page=page,
limit=limit,
)
@router.post("", response_model=MarketplaceImportJobResponse)
@@ -100,9 +69,7 @@ async def create_marketplace_import_job(
Admins can trigger imports for any vendor by specifying vendor_id.
The import is processed asynchronously in the background.
"""
vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(request.vendor_id), identifier_type="id")
vendor = vendor_service.get_vendor_by_id(db, request.vendor_id)
job_request = MarketplaceImportJobRequest(
source_url=request.source_url,
@@ -144,19 +111,12 @@ def get_import_statistics(
return stats_service.get_import_statistics(db)
@router.get("/{job_id}")
@router.get("/{job_id}", response_model=AdminMarketplaceImportJobResponse)
def get_marketplace_import_job(
job_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get a single marketplace import job by ID (Admin only)."""
job = (
db.query(MarketplaceImportJob)
.filter(MarketplaceImportJob.id == job_id)
.first()
)
if not job:
raise ImportJobNotFoundException(job_id)
return _job_to_dict(job)
job = marketplace_import_job_service.get_import_job_by_id_admin(db, job_id)
return marketplace_import_job_service.convert_to_admin_response_model(job)