diff --git a/app/api/v1/admin/marketplace.py b/app/api/v1/admin/marketplace.py index 8fc3aa86..0d635b11 100644 --- a/app/api/v1/admin/marketplace.py +++ b/app/api/v1/admin/marketplace.py @@ -5,7 +5,7 @@ Marketplace import job monitoring endpoints for admin. import logging -from fastapi import APIRouter, Depends, Query +from fastapi import APIRouter, BackgroundTasks, Depends, Query from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api @@ -25,30 +25,113 @@ router = APIRouter(prefix="/marketplace-import-jobs") logger = logging.getLogger(__name__) -@router.get("", response_model=list[MarketplaceImportJobResponse]) +@router.get("") def get_all_marketplace_import_jobs( marketplace: str | None = Query(None), vendor_name: str | None = Query(None), status: str | None = Query(None), - skip: int = Query(0, ge=0), + page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=100), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """Get all marketplace import jobs (Admin only).""" - return admin_service.get_marketplace_import_jobs( - db=db, - marketplace=marketplace, - vendor_name=vendor_name, - status=status, - skip=skip, - limit=limit, + from models.database.marketplace_import_job import MarketplaceImportJob + + # Build base query + query = db.query(MarketplaceImportJob) + + if marketplace: + query = query.filter( + MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") + ) + if status: + query = query.filter(MarketplaceImportJob.status == status) + + # Get total count + total = query.count() + + # Get paginated results + skip = (page - 1) * limit + jobs = ( + query.order_by(MarketplaceImportJob.created_at.desc()) + .offset(skip) + .limit(limit) + .all() ) + # Convert to response format with 'id' field for frontend compatibility + items = [] + for job in jobs: + items.append({ + "id": job.id, + "job_id": job.id, + "status": job.status, + "marketplace": job.marketplace, + "source_url": job.source_url, + "vendor_id": job.vendor.id if job.vendor else None, + "vendor_code": job.vendor.vendor_code if job.vendor else None, + "vendor_name": job.vendor.name if job.vendor else None, + "imported": job.imported_count or 0, + "updated": job.updated_count or 0, + "total_processed": job.total_processed or 0, + "error_count": job.error_count or 0, + "error_message": job.error_message, + "error_details": [], # Placeholder for future use + "created_at": job.created_at.isoformat() if job.created_at else None, + "started_at": job.started_at.isoformat() if job.started_at else None, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "created_by_name": job.user.username if job.user else None, + }) + + return { + "items": items, + "total": total, + "page": page, + "limit": limit, + } + + +@router.get("/{job_id}") +def get_marketplace_import_job( + job_id: int, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """Get a single marketplace import job by ID (Admin only).""" + from app.exceptions import ImportJobNotFoundException + from models.database.marketplace_import_job import MarketplaceImportJob + + job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + if not job: + raise ImportJobNotFoundException(job_id) + + return { + "id": job.id, + "job_id": job.id, + "status": job.status, + "marketplace": job.marketplace, + "source_url": job.source_url, + "vendor_id": job.vendor.id if job.vendor else None, + "vendor_code": job.vendor.vendor_code if job.vendor else None, + "vendor_name": job.vendor.name if job.vendor else None, + "imported": job.imported_count or 0, + "updated": job.updated_count or 0, + "total_processed": job.total_processed or 0, + "error_count": job.error_count or 0, + "error_message": job.error_message, + "error_details": [], # Placeholder for future use + "created_at": job.created_at.isoformat() if job.created_at else None, + "started_at": job.started_at.isoformat() if job.started_at else None, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "created_by_name": job.user.username if job.user else None, + } + @router.post("", response_model=MarketplaceImportJobResponse) def create_marketplace_import_job( request: AdminMarketplaceImportJobRequest, + background_tasks: BackgroundTasks, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): @@ -56,6 +139,7 @@ def create_marketplace_import_job( Create a new marketplace import job (Admin only). Admins can trigger imports for any vendor by specifying vendor_id. + The import is processed asynchronously in the background. """ # Look up the vendor vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first() @@ -83,6 +167,20 @@ def create_marketplace_import_job( f"for vendor {vendor.vendor_code}" ) + # Start background processing + from app.tasks.background_tasks import process_marketplace_import + + background_tasks.add_task( + process_marketplace_import, + job_id=job.id, + url=request.source_url, + marketplace=request.marketplace, + vendor_id=vendor.id, + batch_size=request.batch_size or 1000, + ) + + logger.info(f"Background task queued for import job {job.id}") + return marketplace_import_job_service.convert_to_response_model(job)