refactor: enforce strict architecture rules and add Pydantic response models
- Update architecture rules to be stricter (API-003 now blocks ALL exception raising in endpoints, not just HTTPException) - Update get_current_vendor_api dependency to guarantee token_vendor_id presence - Remove redundant _get_vendor_from_token helpers from all vendor API files - Move vendor access validation to service layer methods - Add Pydantic response models for media, notification, and payment endpoints - Add get_active_vendor_by_code service method for public vendor lookup - Add get_import_job_for_vendor service method with vendor validation - Update validation script to detect exception raising patterns in endpoints 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -94,6 +94,50 @@ class MarketplaceImportJobService:
|
||||
logger.error(f"Error getting import job {job_id}: {str(e)}")
|
||||
raise ValidationException("Failed to retrieve import job")
|
||||
|
||||
def get_import_job_for_vendor(
|
||||
self, db: Session, job_id: int, vendor_id: int
|
||||
) -> MarketplaceImportJob:
|
||||
"""
|
||||
Get a marketplace import job by ID with vendor access control.
|
||||
|
||||
Validates that the job belongs to the specified vendor.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
job_id: Import job ID
|
||||
vendor_id: Vendor ID from token (to verify ownership)
|
||||
|
||||
Raises:
|
||||
ImportJobNotFoundException: If job not found
|
||||
UnauthorizedVendorAccessException: If job doesn't belong to vendor
|
||||
"""
|
||||
from app.exceptions import UnauthorizedVendorAccessException
|
||||
|
||||
try:
|
||||
job = (
|
||||
db.query(MarketplaceImportJob)
|
||||
.filter(MarketplaceImportJob.id == job_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not job:
|
||||
raise ImportJobNotFoundException(job_id)
|
||||
|
||||
# Verify job belongs to vendor (service layer validation)
|
||||
if job.vendor_id != vendor_id:
|
||||
raise UnauthorizedVendorAccessException(
|
||||
vendor_code=str(vendor_id),
|
||||
user_id=0, # Not user-specific, but vendor mismatch
|
||||
)
|
||||
|
||||
return job
|
||||
|
||||
except (ImportJobNotFoundException, UnauthorizedVendorAccessException):
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting import job {job_id} for vendor {vendor_id}: {str(e)}")
|
||||
raise ValidationException("Failed to retrieve import job")
|
||||
|
||||
def get_import_jobs(
|
||||
self,
|
||||
db: Session,
|
||||
|
||||
@@ -252,6 +252,44 @@ class VendorService:
|
||||
|
||||
return vendor
|
||||
|
||||
def get_active_vendor_by_code(self, db: Session, vendor_code: str) -> Vendor:
|
||||
"""
|
||||
Get active vendor by vendor_code for public access (no auth required).
|
||||
|
||||
This method is specifically designed for public endpoints where:
|
||||
- No authentication is required
|
||||
- Only active vendors should be returned
|
||||
- Inactive/disabled vendors are hidden
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_code: Vendor code (case-insensitive)
|
||||
|
||||
Returns:
|
||||
Vendor object with company and owner loaded
|
||||
|
||||
Raises:
|
||||
VendorNotFoundException: If vendor not found or inactive
|
||||
"""
|
||||
from sqlalchemy.orm import joinedload
|
||||
from models.database.company import Company
|
||||
|
||||
vendor = (
|
||||
db.query(Vendor)
|
||||
.options(joinedload(Vendor.company).joinedload(Company.owner))
|
||||
.filter(
|
||||
func.upper(Vendor.vendor_code) == vendor_code.upper(),
|
||||
Vendor.is_active == True,
|
||||
)
|
||||
.first()
|
||||
)
|
||||
|
||||
if not vendor:
|
||||
logger.warning(f"Vendor not found or inactive: {vendor_code}")
|
||||
raise VendorNotFoundException(vendor_code, identifier_type="code")
|
||||
|
||||
return vendor
|
||||
|
||||
def get_vendor_by_identifier(self, db: Session, identifier: str) -> Vendor:
|
||||
"""
|
||||
Get vendor by ID or vendor_code (admin use - no access control).
|
||||
@@ -544,6 +582,107 @@ class VendorService:
|
||||
"""Check if user is vendor owner (via company ownership)."""
|
||||
return vendor.company and vendor.company.owner_user_id == user.id
|
||||
|
||||
def can_update_vendor(self, vendor: Vendor, user: User) -> bool:
|
||||
"""
|
||||
Check if user has permission to update vendor settings.
|
||||
|
||||
Permission granted to:
|
||||
- Admins (always)
|
||||
- Vendor owners (company owner)
|
||||
- Team members with appropriate role (owner role in VendorUser)
|
||||
"""
|
||||
# Admins can always update
|
||||
if user.role == "admin":
|
||||
return True
|
||||
|
||||
# Check if user is vendor owner via company
|
||||
if self._is_vendor_owner(vendor, user):
|
||||
return True
|
||||
|
||||
# Check if user is owner via VendorUser relationship
|
||||
if user.is_owner_of(vendor.id):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def update_vendor(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
vendor_update,
|
||||
current_user: User,
|
||||
) -> "Vendor":
|
||||
"""
|
||||
Update vendor profile with permission checking.
|
||||
|
||||
Raises:
|
||||
VendorNotFoundException: If vendor not found
|
||||
InsufficientPermissionsException: If user lacks permission
|
||||
"""
|
||||
from app.exceptions import InsufficientPermissionsException
|
||||
|
||||
vendor = self.get_vendor_by_id(db, vendor_id)
|
||||
|
||||
# Check permissions in service layer
|
||||
if not self.can_update_vendor(vendor, current_user):
|
||||
raise InsufficientPermissionsException(
|
||||
required_permission="vendor:profile:update"
|
||||
)
|
||||
|
||||
# Apply updates
|
||||
update_data = vendor_update.model_dump(exclude_unset=True)
|
||||
for field, value in update_data.items():
|
||||
if hasattr(vendor, field):
|
||||
setattr(vendor, field, value)
|
||||
|
||||
db.add(vendor)
|
||||
db.flush()
|
||||
db.refresh(vendor)
|
||||
return vendor
|
||||
|
||||
def update_marketplace_settings(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
marketplace_config: dict,
|
||||
current_user: User,
|
||||
) -> dict:
|
||||
"""
|
||||
Update marketplace integration settings with permission checking.
|
||||
|
||||
Raises:
|
||||
VendorNotFoundException: If vendor not found
|
||||
InsufficientPermissionsException: If user lacks permission
|
||||
"""
|
||||
from app.exceptions import InsufficientPermissionsException
|
||||
|
||||
vendor = self.get_vendor_by_id(db, vendor_id)
|
||||
|
||||
# Check permissions in service layer
|
||||
if not self.can_update_vendor(vendor, current_user):
|
||||
raise InsufficientPermissionsException(
|
||||
required_permission="vendor:settings:update"
|
||||
)
|
||||
|
||||
# Update Letzshop URLs
|
||||
if "letzshop_csv_url_fr" in marketplace_config:
|
||||
vendor.letzshop_csv_url_fr = marketplace_config["letzshop_csv_url_fr"]
|
||||
if "letzshop_csv_url_en" in marketplace_config:
|
||||
vendor.letzshop_csv_url_en = marketplace_config["letzshop_csv_url_en"]
|
||||
if "letzshop_csv_url_de" in marketplace_config:
|
||||
vendor.letzshop_csv_url_de = marketplace_config["letzshop_csv_url_de"]
|
||||
|
||||
db.add(vendor)
|
||||
db.flush()
|
||||
db.refresh(vendor)
|
||||
|
||||
return {
|
||||
"message": "Marketplace settings updated successfully",
|
||||
"letzshop_csv_url_fr": vendor.letzshop_csv_url_fr,
|
||||
"letzshop_csv_url_en": vendor.letzshop_csv_url_en,
|
||||
"letzshop_csv_url_de": vendor.letzshop_csv_url_de,
|
||||
}
|
||||
|
||||
|
||||
# Create service instance following the same pattern as other services
|
||||
vendor_service = VendorService()
|
||||
|
||||
Reference in New Issue
Block a user