Files
orion/app/api/v1/admin/marketplace.py
Samir Boulahtit 962cc8dcef feat: add POST endpoint for admin marketplace import jobs
Add POST /api/v1/admin/marketplace-import-jobs endpoint to allow
admins to create import jobs for any vendor.

Changes:
- Add AdminMarketplaceImportJobRequest schema with vendor_id field
- Add create_marketplace_import_job endpoint in admin/marketplace.py
- Make vendor_code and vendor_name optional in response model
  to handle edge cases where vendor relationship may not be loaded

This fixes the 405 Method Not Allowed error when trying to import
products from the admin marketplace page.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-05 21:48:13 +01:00

96 lines
2.9 KiB
Python

# app/api/v1/admin/marketplace.py
"""
Marketplace import job monitoring endpoints for admin.
"""
import logging
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.exceptions import VendorNotFoundException
from app.services.admin_service import admin_service
from app.services.marketplace_import_job_service import marketplace_import_job_service
from app.services.stats_service import stats_service
from models.database.user import User
from models.database.vendor import Vendor
from models.schema.marketplace_import_job import (
AdminMarketplaceImportJobRequest,
MarketplaceImportJobResponse,
)
router = APIRouter(prefix="/marketplace-import-jobs")
logger = logging.getLogger(__name__)
@router.get("", response_model=list[MarketplaceImportJobResponse])
def get_all_marketplace_import_jobs(
marketplace: str | None = Query(None),
vendor_name: str | None = Query(None),
status: str | None = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get all marketplace import jobs (Admin only)."""
return admin_service.get_marketplace_import_jobs(
db=db,
marketplace=marketplace,
vendor_name=vendor_name,
status=status,
skip=skip,
limit=limit,
)
@router.post("", response_model=MarketplaceImportJobResponse)
def create_marketplace_import_job(
request: AdminMarketplaceImportJobRequest,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Create a new marketplace import job (Admin only).
Admins can trigger imports for any vendor by specifying vendor_id.
"""
# Look up the vendor
vendor = db.query(Vendor).filter(Vendor.id == request.vendor_id).first()
if not vendor:
raise VendorNotFoundException(str(request.vendor_id), identifier_type="id")
# Create the import job using the service
from models.schema.marketplace_import_job import MarketplaceImportJobRequest
job_request = MarketplaceImportJobRequest(
source_url=request.source_url,
marketplace=request.marketplace,
batch_size=request.batch_size,
)
job = marketplace_import_job_service.create_import_job(
db=db,
request=job_request,
vendor=vendor,
user=current_admin,
)
logger.info(
f"Admin {current_admin.username} created import job {job.id} "
f"for vendor {vendor.vendor_code}"
)
return marketplace_import_job_service.convert_to_response_model(job)
@router.get("/stats")
def get_import_statistics(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get marketplace import statistics (Admin only)."""
return stats_service.get_import_statistics(db)