diff --git a/app/api/v1/admin/marketplace.py b/app/api/v1/admin/marketplace.py index 25fe6d40..c85f0738 100644 --- a/app/api/v1/admin/marketplace.py +++ b/app/api/v1/admin/marketplace.py @@ -10,15 +10,15 @@ from sqlalchemy.orm import Session from app.api.deps import get_current_admin_api from app.core.database import get_db -from app.exceptions import ImportJobNotFoundException, VendorNotFoundException from app.services.marketplace_import_job_service import marketplace_import_job_service from app.services.stats_service import stats_service +from app.services.vendor_service import vendor_service from app.tasks.background_tasks import process_marketplace_import -from models.database.marketplace_import_job import MarketplaceImportJob from models.database.user import User -from models.database.vendor import Vendor from models.schema.marketplace_import_job import ( + AdminMarketplaceImportJobListResponse, AdminMarketplaceImportJobRequest, + AdminMarketplaceImportJobResponse, MarketplaceImportJobRequest, MarketplaceImportJobResponse, ) @@ -27,31 +27,7 @@ router = APIRouter(prefix="/marketplace-import-jobs") logger = logging.getLogger(__name__) -def _job_to_dict(job: MarketplaceImportJob) -> dict: - """Convert a MarketplaceImportJob to a response dict with frontend-compatible fields.""" - return { - "id": job.id, - "job_id": job.id, - "status": job.status, - "marketplace": job.marketplace, - "source_url": job.source_url, - "vendor_id": job.vendor.id if job.vendor else None, - "vendor_code": job.vendor.vendor_code if job.vendor else None, - "vendor_name": job.vendor.name if job.vendor else None, - "imported": job.imported_count or 0, - "updated": job.updated_count or 0, - "total_processed": job.total_processed or 0, - "error_count": job.error_count or 0, - "error_message": job.error_message, - "error_details": [], # Placeholder for future use - "created_at": job.created_at.isoformat() if job.created_at else None, - "started_at": job.started_at.isoformat() if job.started_at else None, - "completed_at": job.completed_at.isoformat() if job.completed_at else None, - "created_by_name": job.user.username if job.user else None, - } - - -@router.get("") +@router.get("", response_model=AdminMarketplaceImportJobListResponse) def get_all_marketplace_import_jobs( marketplace: str | None = Query(None), status: str | None = Query(None), @@ -61,30 +37,23 @@ def get_all_marketplace_import_jobs( current_admin: User = Depends(get_current_admin_api), ): """Get all marketplace import jobs with pagination (Admin only).""" - query = db.query(MarketplaceImportJob) - - if marketplace: - query = query.filter( - MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") - ) - if status: - query = query.filter(MarketplaceImportJob.status == status) - - total = query.count() - skip = (page - 1) * limit - jobs = ( - query.order_by(MarketplaceImportJob.created_at.desc()) - .offset(skip) - .limit(limit) - .all() + jobs, total = marketplace_import_job_service.get_all_import_jobs_paginated( + db=db, + marketplace=marketplace, + status=status, + page=page, + limit=limit, ) - return { - "items": [_job_to_dict(job) for job in jobs], - "total": total, - "page": page, - "limit": limit, - } + return AdminMarketplaceImportJobListResponse( + items=[ + marketplace_import_job_service.convert_to_admin_response_model(job) + for job in jobs + ], + total=total, + page=page, + limit=limit, + ) @router.post("", response_model=MarketplaceImportJobResponse) @@ -100,9 +69,7 @@ async def create_marketplace_import_job( Admins can trigger imports for any vendor by specifying vendor_id. The import is processed asynchronously in the background. """ - vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first() - if not vendor: - raise VendorNotFoundException(str(request.vendor_id), identifier_type="id") + vendor = vendor_service.get_vendor_by_id(db, request.vendor_id) job_request = MarketplaceImportJobRequest( source_url=request.source_url, @@ -144,19 +111,12 @@ def get_import_statistics( return stats_service.get_import_statistics(db) -@router.get("/{job_id}") +@router.get("/{job_id}", response_model=AdminMarketplaceImportJobResponse) def get_marketplace_import_job( job_id: int, db: Session = Depends(get_db), current_admin: User = Depends(get_current_admin_api), ): """Get a single marketplace import job by ID (Admin only).""" - job = ( - db.query(MarketplaceImportJob) - .filter(MarketplaceImportJob.id == job_id) - .first() - ) - if not job: - raise ImportJobNotFoundException(job_id) - - return _job_to_dict(job) + job = marketplace_import_job_service.get_import_job_by_id_admin(db, job_id) + return marketplace_import_job_service.convert_to_admin_response_model(job) diff --git a/app/services/marketplace_import_job_service.py b/app/services/marketplace_import_job_service.py index f52b9a77..339cf894 100644 --- a/app/services/marketplace_import_job_service.py +++ b/app/services/marketplace_import_job_service.py @@ -12,6 +12,7 @@ from models.database.marketplace_import_job import MarketplaceImportJob from models.database.user import User from models.database.vendor import Vendor from models.schema.marketplace_import_job import ( + AdminMarketplaceImportJobResponse, MarketplaceImportJobRequest, MarketplaceImportJobResponse, ) @@ -186,10 +187,8 @@ class MarketplaceImportJobService: status=job.status, marketplace=job.marketplace, vendor_id=job.vendor_id, - vendor_code=job.vendor.vendor_code if job.vendor else None, # FIXED - vendor_name=( - job.vendor.name if job.vendor else None - ), # FIXED: from relationship + vendor_code=job.vendor.vendor_code if job.vendor else None, + vendor_name=job.vendor.name if job.vendor else None, source_url=job.source_url, imported=job.imported_count or 0, updated=job.updated_count or 0, @@ -201,7 +200,77 @@ class MarketplaceImportJobService: completed_at=job.completed_at, ) - # ... other methods (cancel, delete, etc.) remain similar ... + def convert_to_admin_response_model( + self, job: MarketplaceImportJob + ) -> AdminMarketplaceImportJobResponse: + """Convert database model to admin API response model with extra fields.""" + return AdminMarketplaceImportJobResponse( + id=job.id, + job_id=job.id, + status=job.status, + marketplace=job.marketplace, + vendor_id=job.vendor_id, + vendor_code=job.vendor.vendor_code if job.vendor else None, + vendor_name=job.vendor.name if job.vendor else None, + source_url=job.source_url, + imported=job.imported_count or 0, + updated=job.updated_count or 0, + total_processed=job.total_processed or 0, + error_count=job.error_count or 0, + error_message=job.error_message, + error_details=[], + created_at=job.created_at, + started_at=job.started_at, + completed_at=job.completed_at, + created_by_name=job.user.username if job.user else None, + ) + + def get_all_import_jobs_paginated( + self, + db: Session, + marketplace: str | None = None, + status: str | None = None, + page: int = 1, + limit: int = 100, + ) -> tuple[list[MarketplaceImportJob], int]: + """Get all marketplace import jobs with pagination (for admin).""" + try: + query = db.query(MarketplaceImportJob) + + if marketplace: + query = query.filter( + MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%") + ) + if status: + query = query.filter(MarketplaceImportJob.status == status) + + total = query.count() + skip = (page - 1) * limit + jobs = ( + query.order_by(MarketplaceImportJob.created_at.desc()) + .offset(skip) + .limit(limit) + .all() + ) + + return jobs, total + + except Exception as e: + logger.error(f"Error getting all import jobs: {str(e)}") + raise ValidationException("Failed to retrieve import jobs") + + def get_import_job_by_id_admin( + self, db: Session, job_id: int + ) -> MarketplaceImportJob: + """Get a marketplace import job by ID (admin - no access control).""" + job = ( + db.query(MarketplaceImportJob) + .filter(MarketplaceImportJob.id == job_id) + .first() + ) + if not job: + raise ImportJobNotFoundException(job_id) + return job marketplace_import_job_service = MarketplaceImportJobService() diff --git a/models/schema/marketplace_import_job.py b/models/schema/marketplace_import_job.py index d99fef60..85def47b 100644 --- a/models/schema/marketplace_import_job.py +++ b/models/schema/marketplace_import_job.py @@ -93,6 +93,23 @@ class MarketplaceImportJobListResponse(BaseModel): limit: int +class AdminMarketplaceImportJobResponse(MarketplaceImportJobResponse): + """Extended response schema for admin with additional fields.""" + + id: int # Alias for job_id (frontend compatibility) + error_details: list = [] # Placeholder for future error details + created_by_name: str | None = None # Username of who created the job + + +class AdminMarketplaceImportJobListResponse(BaseModel): + """Response schema for paginated list of import jobs (admin).""" + + items: list[AdminMarketplaceImportJobResponse] + total: int + page: int + limit: int + + class MarketplaceImportJobStatusUpdate(BaseModel): """Schema for updating import job status (internal use)."""