diff --git a/app/api/v1/admin/marketplace.py b/app/api/v1/admin/marketplace.py index fc39f896..25fe6d40 100644 --- a/app/api/v1/admin/marketplace.py +++ b/app/api/v1/admin/marketplace.py @@ -10,14 +10,16 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db -from app.exceptions import VendorNotFoundException +from app.exceptions import ImportJobNotFoundException, VendorNotFoundException from app.services.marketplace_import_job_service import marketplace_import_job_service from app.services.stats_service import stats_service from app.tasks.background_tasks import process_marketplace_import +from models.database.marketplace_import_job import MarketplaceImportJob from models.database.user import User from models.database.vendor import Vendor from models.schema.marketplace_import_job import ( AdminMarketplaceImportJobRequest, + MarketplaceImportJobRequest, MarketplaceImportJobResponse, ) @@ -25,20 +27,40 @@ router = APIRouter(prefix="/marketplace-import-jobs") logger = logging.getLogger(__name__) +def _job_to_dict(job: MarketplaceImportJob) -> dict: + """Convert a MarketplaceImportJob to a response dict with frontend-compatible fields.""" + return { + "id": job.id, + "job_id": job.id, + "status": job.status, + "marketplace": job.marketplace, + "source_url": job.source_url, + "vendor_id": job.vendor.id if job.vendor else None, + "vendor_code": job.vendor.vendor_code if job.vendor else None, + "vendor_name": job.vendor.name if job.vendor else None, + "imported": job.imported_count or 0, + "updated": job.updated_count or 0, + "total_processed": job.total_processed or 0, + "error_count": job.error_count or 0, + "error_message": job.error_message, + "error_details": [], # Placeholder for future use + "created_at": job.created_at.isoformat() if job.created_at else None, + "started_at": job.started_at.isoformat() if job.started_at else None, + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "created_by_name": job.user.username if job.user else None, + } + + @router.get("") def get_all_marketplace_import_jobs( marketplace: str | None = Query(None), - vendor_name: str | None = Query(None), status: str | None = Query(None), page: int = Query(1, ge=1), limit: int = Query(100, ge=1, le=100), db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): - """Get all marketplace import jobs (Admin only).""" - from models.database.marketplace_import_job import MarketplaceImportJob - - # Build base query + """Get all marketplace import jobs with pagination (Admin only).""" query = db.query(MarketplaceImportJob) if marketplace: @@ -48,10 +70,7 @@ def get_all_marketplace_import_jobs( if status: query = query.filter(MarketplaceImportJob.status == status) - # Get total count total = query.count() - - # Get paginated results skip = (page - 1) * limit jobs = ( query.order_by(MarketplaceImportJob.created_at.desc()) @@ -60,32 +79,8 @@ def get_all_marketplace_import_jobs( .all() ) - # Convert to response format with 'id' field for frontend compatibility - items = [] - for job in jobs: - items.append({ - "id": job.id, - "job_id": job.id, - "status": job.status, - "marketplace": job.marketplace, - "source_url": job.source_url, - "vendor_id": job.vendor.id if job.vendor else None, - "vendor_code": job.vendor.vendor_code if job.vendor else None, - "vendor_name": job.vendor.name if job.vendor else None, - "imported": job.imported_count or 0, - "updated": job.updated_count or 0, - "total_processed": job.total_processed or 0, - "error_count": job.error_count or 0, - "error_message": job.error_message, - "error_details": [], # Placeholder for future use - "created_at": job.created_at.isoformat() if job.created_at else None, - "started_at": job.started_at.isoformat() if job.started_at else None, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "created_by_name": job.user.username if job.user else None, - }) - return { - "items": items, + "items": [_job_to_dict(job) for job in jobs], "total": total, "page": page, "limit": limit, @@ -105,14 +100,10 @@ async def create_marketplace_import_job( Admins can trigger imports for any vendor by specifying vendor_id. The import is processed asynchronously in the background. """ - # Look up the vendor vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first() if not vendor: raise VendorNotFoundException(str(request.vendor_id), identifier_type="id") - # Create the import job using the service - from models.schema.marketplace_import_job import MarketplaceImportJobRequest - job_request = MarketplaceImportJobRequest( source_url=request.source_url, marketplace=request.marketplace, @@ -131,7 +122,6 @@ async def create_marketplace_import_job( f"for vendor {vendor.vendor_code}" ) - # Start background processing (positional args like vendor endpoint) background_tasks.add_task( process_marketplace_import, job.id, @@ -141,8 +131,6 @@ async def create_marketplace_import_job( request.batch_size or 1000, ) - logger.info(f"Background task queued for import job {job.id}") - return marketplace_import_job_service.convert_to_response_model(job) @@ -163,30 +151,12 @@ def get_marketplace_import_job( current_admin: User = Depends(get_current_admin_api), ): """Get a single marketplace import job by ID (Admin only).""" - from app.exceptions import ImportJobNotFoundException - from models.database.marketplace_import_job import MarketplaceImportJob - - job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first() + job = ( + db.query(MarketplaceImportJob) + .filter(MarketplaceImportJob.id == job_id) + .first() + ) if not job: raise ImportJobNotFoundException(job_id) - return { - "id": job.id, - "job_id": job.id, - "status": job.status, - "marketplace": job.marketplace, - "source_url": job.source_url, - "vendor_id": job.vendor.id if job.vendor else None, - "vendor_code": job.vendor.vendor_code if job.vendor else None, - "vendor_name": job.vendor.name if job.vendor else None, - "imported": job.imported_count or 0, - "updated": job.updated_count or 0, - "total_processed": job.total_processed or 0, - "error_count": job.error_count or 0, - "error_message": job.error_message, - "error_details": [], # Placeholder for future use - "created_at": job.created_at.isoformat() if job.created_at else None, - "started_at": job.started_at.isoformat() if job.started_at else None, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "created_by_name": job.user.username if job.user else None, - } + return _job_to_dict(job)