From c040d62d3f171d51fd7fbb161720049f14648ff1 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Fri, 5 Dec 2025 21:55:53 +0100 Subject: [PATCH] fix: admin import jobs API response format and background processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Issues fixed: 1. GET endpoint now returns {items, total, page, limit} format instead of plain list (frontend expects this format) 2. Add GET /{job_id} endpoint for viewing single job details 3. Add 'id' field alongside 'job_id' for frontend compatibility 4. Add 'error_details' and 'created_by_name' fields to response 5. Trigger background processing when creating import job (jobs were created but never processed) The import jobs will now actually process the CSV files and import products into the database. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- app/api/v1/admin/marketplace.py | 118 +++++++++++++++++++++++++++++--- 1 file changed, 108 insertions(+), 10 deletions(-) diff --git a/app/api/v1/admin/marketplace.py b/app/api/v1/admin/marketplace.py index 8fc3aa86..0d635b11 100644 --- a/app/api/v1/admin/marketplace.py +++ b/app/api/v1/admin/marketplace.py @@ -5,7 +5,7 @@ Marketplace import job monitoring endpoints for admin. import logging -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, BackgroundTasks, Depends, Query from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api @@ -25,30 +25,113 @@ router = APIRouter(prefix="/marketplace-import-jobs") logger = logging.getLogger(__name__) -@router.get("", response_model=list[MarketplaceImportJobResponse]) +@router.get("") def get_all_marketplace_import_jobs( marketplace: str | None = Query(None), vendor_name: str | None = Query(None), status: str | None = Query(None), - skip: int = Query(0, ge=0), + page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=100), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """Get all marketplace import jobs (Admin only).""" - return admin_service.get_marketplace_import_jobs( - db=db, - marketplace=marketplace, - vendor_name=vendor_name, - status=status, - skip=skip, - limit=limit, + from models.database.marketplace_import_job import MarketplaceImportJob + + # Build base query + query = db.query(MarketplaceImportJob) + + if marketplace: + query = query.filter( + MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") + ) + if status: + query = query.filter(MarketplaceImportJob.status == status) + + # Get total count + total = query.count() + + # Get paginated results + skip = (page - 1) * limit + jobs = ( + query.order_by(MarketplaceImportJob.created_at.desc()) + .offset(skip) + .limit(limit) + .all() ) + # Convert to response format with 'id' field for frontend compatibility + items = [] + for job in jobs: + items.append({ + "id": job.id, + "job_id": job.id, + "status": job.status, + "marketplace": job.marketplace, + "source_url": job.source_url, + "vendor_id": job.vendor.id if job.vendor else None, + "vendor_code": job.vendor.vendor_code if job.vendor else None, + "vendor_name": job.vendor.name if job.vendor else None, + "imported": job.imported_count or 0, + "updated": job.updated_count or 0, + "total_processed": job.total_processed or 0, + "error_count": job.error_count or 0, + "error_message": job.error_message, + "error_details": [], # Placeholder for future use + "created_at": job.created_at.isoformat() if job.created_at else None, + "started_at": job.started_at.isoformat() if job.started_at else None, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "created_by_name": job.user.username if job.user else None, + }) + + return { + "items": items, + "total": total, + "page": page, + "limit": limit, + } + + +@router.get("/{job_id}") +def get_marketplace_import_job( + job_id: int, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """Get a single marketplace import job by ID (Admin only).""" + from app.exceptions import ImportJobNotFoundException + from models.database.marketplace_import_job import MarketplaceImportJob + + job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + if not job: + raise ImportJobNotFoundException(job_id) + + return { + "id": job.id, + "job_id": job.id, + "status": job.status, + "marketplace": job.marketplace, + "source_url": job.source_url, + "vendor_id": job.vendor.id if job.vendor else None, + "vendor_code": job.vendor.vendor_code if job.vendor else None, + "vendor_name": job.vendor.name if job.vendor else None, + "imported": job.imported_count or 0, + "updated": job.updated_count or 0, + "total_processed": job.total_processed or 0, + "error_count": job.error_count or 0, + "error_message": job.error_message, + "error_details": [], # Placeholder for future use + "created_at": job.created_at.isoformat() if job.created_at else None, + "started_at": job.started_at.isoformat() if job.started_at else None, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "created_by_name": job.user.username if job.user else None, + } + @router.post("", response_model=MarketplaceImportJobResponse) def create_marketplace_import_job( request: AdminMarketplaceImportJobRequest, + background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): @@ -56,6 +139,7 @@ def create_marketplace_import_job( Create a new marketplace import job (Admin only). Admins can trigger imports for any vendor by specifying vendor_id. + The import is processed asynchronously in the background. """ # Look up the vendor vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first() @@ -83,6 +167,20 @@ def create_marketplace_import_job( f"for vendor {vendor.vendor_code}" ) + # Start background processing + from app.tasks.background_tasks import process_marketplace_import + + background_tasks.add_task( + process_marketplace_import, + job_id=job.id, + url=request.source_url, + marketplace=request.marketplace, + vendor_id=vendor.id, + batch_size=request.batch_size or 1000, + ) + + logger.info(f"Background task queued for import job {job.id}") + return marketplace_import_job_service.convert_to_response_model(job)