fix: admin import jobs API response format and background processing

Issues fixed:
1. GET endpoint now returns {items, total, page, limit} format
   instead of plain list (frontend expects this format)
2. Add GET /{job_id} endpoint for viewing single job details
3. Add 'id' field alongside 'job_id' for frontend compatibility
4. Add 'error_details' and 'created_by_name' fields to response
5. Trigger background processing when creating import job
   (jobs were created but never processed)

The import jobs will now actually process the CSV files and
import products into the database.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-05 21:55:53 +01:00
parent 962cc8dcef
commit c040d62d3f

View File

@@ -5,7 +5,7 @@ Marketplace import job monitoring endpoints for admin.
import logging
from fastapi import APIRouter, Depends, Query
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
@@ -25,30 +25,113 @@ router = APIRouter(prefix="/marketplace-import-jobs")
logger = logging.getLogger(__name__)
@router.get("", response_model=list[MarketplaceImportJobResponse])
@router.get("")
def get_all_marketplace_import_jobs(
marketplace: str | None = Query(None),
vendor_name: str | None = Query(None),
status: str | None = Query(None),
skip: int = Query(0, ge=0),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get all marketplace import jobs (Admin only)."""
return admin_service.get_marketplace_import_jobs(
db=db,
marketplace=marketplace,
vendor_name=vendor_name,
status=status,
skip=skip,
limit=limit,
from models.database.marketplace_import_job import MarketplaceImportJob
# Build base query
query = db.query(MarketplaceImportJob)
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if status:
query = query.filter(MarketplaceImportJob.status == status)
# Get total count
total = query.count()
# Get paginated results
skip = (page - 1) * limit
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
# Convert to response format with 'id' field for frontend compatibility
items = []
for job in jobs:
items.append({
"id": job.id,
"job_id": job.id,
"status": job.status,
"marketplace": job.marketplace,
"source_url": job.source_url,
"vendor_id": job.vendor.id if job.vendor else None,
"vendor_code": job.vendor.vendor_code if job.vendor else None,
"vendor_name": job.vendor.name if job.vendor else None,
"imported": job.imported_count or 0,
"updated": job.updated_count or 0,
"total_processed": job.total_processed or 0,
"error_count": job.error_count or 0,
"error_message": job.error_message,
"error_details": [], # Placeholder for future use
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"created_by_name": job.user.username if job.user else None,
})
return {
"items": items,
"total": total,
"page": page,
"limit": limit,
}
@router.get("/{job_id}")
def get_marketplace_import_job(
job_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get a single marketplace import job by ID (Admin only)."""
from app.exceptions import ImportJobNotFoundException
from models.database.marketplace_import_job import MarketplaceImportJob
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
raise ImportJobNotFoundException(job_id)
return {
"id": job.id,
"job_id": job.id,
"status": job.status,
"marketplace": job.marketplace,
"source_url": job.source_url,
"vendor_id": job.vendor.id if job.vendor else None,
"vendor_code": job.vendor.vendor_code if job.vendor else None,
"vendor_name": job.vendor.name if job.vendor else None,
"imported": job.imported_count or 0,
"updated": job.updated_count or 0,
"total_processed": job.total_processed or 0,
"error_count": job.error_count or 0,
"error_message": job.error_message,
"error_details": [], # Placeholder for future use
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"created_by_name": job.user.username if job.user else None,
}
@router.post("", response_model=MarketplaceImportJobResponse)
def create_marketplace_import_job(
request: AdminMarketplaceImportJobRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
@@ -56,6 +139,7 @@ def create_marketplace_import_job(
Create a new marketplace import job (Admin only).
Admins can trigger imports for any vendor by specifying vendor_id.
The import is processed asynchronously in the background.
"""
# Look up the vendor
vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first()
@@ -83,6 +167,20 @@ def create_marketplace_import_job(
f"for vendor {vendor.vendor_code}"
)
# Start background processing
from app.tasks.background_tasks import process_marketplace_import
background_tasks.add_task(
process_marketplace_import,
job_id=job.id,
url=request.source_url,
marketplace=request.marketplace,
vendor_id=vendor.id,
batch_size=request.batch_size or 1000,
)
logger.info(f"Background task queued for import job {job.id}")
return marketplace_import_job_service.convert_to_response_model(job)