Move database queries to service layer and use proper Pydantic responses: Schema changes (models/schema/marketplace_import_job.py): - Add AdminMarketplaceImportJobResponse with extra fields (id, error_details, created_by_name) - Add AdminMarketplaceImportJobListResponse with items/total/page/limit format Service changes (app/services/marketplace_import_job_service.py): - Add convert_to_admin_response_model() for admin-specific response - Add get_all_import_jobs_paginated() for admin listing - Add get_import_job_by_id_admin() without access control API changes (app/api/v1/admin/marketplace.py): - Use service methods instead of direct DB queries - Use proper Pydantic response models instead of dicts - All business logic now in service layer Cleanup: - Remove duplicate methods from admin_service.py Architecture validation now passes with 0 violations. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
122 lines
3.6 KiB
Python
122 lines
3.6 KiB
Python
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, ConfigDict, Field, field_validator
|
|
|
|
|
|
class MarketplaceImportJobRequest(BaseModel):
|
|
"""Request schema for triggering marketplace import.
|
|
|
|
Note: vendor_id is injected by middleware, not from request body.
|
|
"""
|
|
|
|
source_url: str = Field(..., description="URL to CSV file from marketplace")
|
|
marketplace: str = Field(default="Letzshop", description="Marketplace name")
|
|
batch_size: int | None = Field(
|
|
1000, description="Processing batch size", ge=100, le=10000
|
|
)
|
|
|
|
@field_validator("source_url")
|
|
@classmethod
|
|
def validate_url(cls, v):
|
|
# Basic URL security validation
|
|
if not v.startswith(("http://", "https://")):
|
|
raise ValueError("URL must start with http:// or https://")
|
|
return v.strip()
|
|
|
|
@field_validator("marketplace")
|
|
@classmethod
|
|
def validate_marketplace(cls, v):
|
|
return v.strip()
|
|
|
|
|
|
class AdminMarketplaceImportJobRequest(BaseModel):
|
|
"""Request schema for admin-triggered marketplace import.
|
|
|
|
Includes vendor_id since admin can import for any vendor.
|
|
"""
|
|
|
|
vendor_id: int = Field(..., description="Vendor ID to import products for")
|
|
source_url: str = Field(..., description="URL to CSV file from marketplace")
|
|
marketplace: str = Field(default="Letzshop", description="Marketplace name")
|
|
batch_size: int | None = Field(
|
|
1000, description="Processing batch size", ge=100, le=10000
|
|
)
|
|
|
|
@field_validator("source_url")
|
|
@classmethod
|
|
def validate_url(cls, v):
|
|
# Basic URL security validation
|
|
if not v.startswith(("http://", "https://")):
|
|
raise ValueError("URL must start with http:// or https://")
|
|
return v.strip()
|
|
|
|
@field_validator("marketplace")
|
|
@classmethod
|
|
def validate_marketplace(cls, v):
|
|
return v.strip()
|
|
|
|
|
|
class MarketplaceImportJobResponse(BaseModel):
|
|
"""Response schema for marketplace import job."""
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
job_id: int
|
|
vendor_id: int
|
|
vendor_code: str | None = None # Populated from vendor relationship
|
|
vendor_name: str | None = None # Populated from vendor relationship
|
|
marketplace: str
|
|
source_url: str
|
|
status: str
|
|
|
|
# Counts
|
|
imported: int = 0
|
|
updated: int = 0
|
|
total_processed: int = 0
|
|
error_count: int = 0
|
|
|
|
# Error details
|
|
error_message: str | None = None
|
|
|
|
# Timestamps
|
|
created_at: datetime
|
|
started_at: datetime | None = None
|
|
completed_at: datetime | None = None
|
|
|
|
|
|
class MarketplaceImportJobListResponse(BaseModel):
|
|
"""Response schema for list of import jobs."""
|
|
|
|
jobs: list[MarketplaceImportJobResponse]
|
|
total: int
|
|
skip: int
|
|
limit: int
|
|
|
|
|
|
class AdminMarketplaceImportJobResponse(MarketplaceImportJobResponse):
|
|
"""Extended response schema for admin with additional fields."""
|
|
|
|
id: int # Alias for job_id (frontend compatibility)
|
|
error_details: list = [] # Placeholder for future error details
|
|
created_by_name: str | None = None # Username of who created the job
|
|
|
|
|
|
class AdminMarketplaceImportJobListResponse(BaseModel):
|
|
"""Response schema for paginated list of import jobs (admin)."""
|
|
|
|
items: list[AdminMarketplaceImportJobResponse]
|
|
total: int
|
|
page: int
|
|
limit: int
|
|
|
|
|
|
class MarketplaceImportJobStatusUpdate(BaseModel):
|
|
"""Schema for updating import job status (internal use)."""
|
|
|
|
status: str
|
|
imported_count: int | None = None
|
|
updated_count: int | None = None
|
|
error_count: int | None = None
|
|
total_processed: int | None = None
|
|
error_message: str | None = None
|