Files
orion/app/api/v1/admin/marketplace.py
Samir Boulahtit c040d62d3f fix: admin import jobs API response format and background processing
Issues fixed:
1. GET endpoint now returns {items, total, page, limit} format
   instead of plain list (frontend expects this format)
2. Add GET /{job_id} endpoint for viewing single job details
3. Add 'id' field alongside 'job_id' for frontend compatibility
4. Add 'error_details' and 'created_by_name' fields to response
5. Trigger background processing when creating import job
   (jobs were created but never processed)

The import jobs will now actually process the CSV files and
import products into the database.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-05 21:55:53 +01:00

194 lines
6.5 KiB
Python

# app/api/v1/admin/marketplace.py
"""
Marketplace import job monitoring endpoints for admin.
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.exceptions import VendorNotFoundException
from app.services.admin_service import admin_service
from app.services.marketplace_import_job_service import marketplace_import_job_service
from app.services.stats_service import stats_service
from models.database.user import User
from models.database.vendor import Vendor
from models.schema.marketplace_import_job import (
AdminMarketplaceImportJobRequest,
MarketplaceImportJobResponse,
)
router = APIRouter(prefix="/marketplace-import-jobs")
logger = logging.getLogger(__name__)
@router.get("")
def get_all_marketplace_import_jobs(
marketplace: str | None = Query(None),
vendor_name: str | None = Query(None),
status: str | None = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get all marketplace import jobs (Admin only)."""
from models.database.marketplace_import_job import MarketplaceImportJob
# Build base query
query = db.query(MarketplaceImportJob)
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if status:
query = query.filter(MarketplaceImportJob.status == status)
# Get total count
total = query.count()
# Get paginated results
skip = (page - 1) * limit
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
# Convert to response format with 'id' field for frontend compatibility
items = []
for job in jobs:
items.append({
"id": job.id,
"job_id": job.id,
"status": job.status,
"marketplace": job.marketplace,
"source_url": job.source_url,
"vendor_id": job.vendor.id if job.vendor else None,
"vendor_code": job.vendor.vendor_code if job.vendor else None,
"vendor_name": job.vendor.name if job.vendor else None,
"imported": job.imported_count or 0,
"updated": job.updated_count or 0,
"total_processed": job.total_processed or 0,
"error_count": job.error_count or 0,
"error_message": job.error_message,
"error_details": [], # Placeholder for future use
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"created_by_name": job.user.username if job.user else None,
})
return {
"items": items,
"total": total,
"page": page,
"limit": limit,
}
@router.get("/{job_id}")
def get_marketplace_import_job(
job_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get a single marketplace import job by ID (Admin only)."""
from app.exceptions import ImportJobNotFoundException
from models.database.marketplace_import_job import MarketplaceImportJob
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
raise ImportJobNotFoundException(job_id)
return {
"id": job.id,
"job_id": job.id,
"status": job.status,
"marketplace": job.marketplace,
"source_url": job.source_url,
"vendor_id": job.vendor.id if job.vendor else None,
"vendor_code": job.vendor.vendor_code if job.vendor else None,
"vendor_name": job.vendor.name if job.vendor else None,
"imported": job.imported_count or 0,
"updated": job.updated_count or 0,
"total_processed": job.total_processed or 0,
"error_count": job.error_count or 0,
"error_message": job.error_message,
"error_details": [], # Placeholder for future use
"created_at": job.created_at.isoformat() if job.created_at else None,
"started_at": job.started_at.isoformat() if job.started_at else None,
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
"created_by_name": job.user.username if job.user else None,
}
@router.post("", response_model=MarketplaceImportJobResponse)
def create_marketplace_import_job(
request: AdminMarketplaceImportJobRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Create a new marketplace import job (Admin only).
Admins can trigger imports for any vendor by specifying vendor_id.
The import is processed asynchronously in the background.
"""
# Look up the vendor
vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(request.vendor_id), identifier_type="id")
# Create the import job using the service
from models.schema.marketplace_import_job import MarketplaceImportJobRequest
job_request = MarketplaceImportJobRequest(
source_url=request.source_url,
marketplace=request.marketplace,
batch_size=request.batch_size,
)
job = marketplace_import_job_service.create_import_job(
db=db,
request=job_request,
vendor=vendor,
user=current_admin,
)
logger.info(
f"Admin {current_admin.username} created import job {job.id} "
f"for vendor {vendor.vendor_code}"
)
# Start background processing
from app.tasks.background_tasks import process_marketplace_import
background_tasks.add_task(
process_marketplace_import,
job_id=job.id,
url=request.source_url,
marketplace=request.marketplace,
vendor_id=vendor.id,
batch_size=request.batch_size or 1000,
)
logger.info(f"Background task queued for import job {job.id}")
return marketplace_import_job_service.convert_to_response_model(job)
@router.get("/stats")
def get_import_statistics(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get marketplace import statistics (Admin only)."""
return stats_service.get_import_statistics(db)