Files
orion/app/services/admin_service.py
Samir Boulahtit 846f92e7e4 feat(vendor): add contact info inheritance from company
Vendors can now override company contact information for specific branding.
Fields are nullable - if null, value is inherited from parent company.

Database changes:
- Add vendor.contact_email, contact_phone, website, business_address, tax_number
- All nullable (null = inherit from company)
- Alembic migration: 28d44d503cac

Model changes:
- Add effective_* properties for resolved values
- Add get_contact_info_with_inheritance() helper

Schema changes:
- VendorCreate: Optional contact fields for override at creation
- VendorUpdate: Contact fields + reset_contact_to_company flag
- VendorDetailResponse: Resolved values + *_inherited flags

API changes:
- GET/PUT vendor endpoints return resolved contact info
- PUT accepts contact overrides (empty string = reset to inherit)
- _build_vendor_detail_response helper for consistent responses

Service changes:
- admin_service.update_vendor handles reset_contact_to_company flag
- Empty strings converted to None for inheritance

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 22:30:31 +01:00

622 lines
22 KiB
Python

# app/services/admin_service.py
"""
Admin service for managing users, vendors, and import jobs.
This module provides classes and functions for:
- User management and status control
- Vendor creation with owner user generation
- Vendor verification and activation
- Marketplace import job monitoring
- Platform statistics
"""
import logging
import secrets
import string
from datetime import UTC, datetime
from sqlalchemy import func, or_
from sqlalchemy.orm import Session, joinedload
from app.exceptions import (
AdminOperationException,
CannotModifySelfException,
UserNotFoundException,
UserStatusChangeException,
ValidationException,
VendorAlreadyExistsException,
VendorNotFoundException,
VendorVerificationException,
)
from models.database.company import Company
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.user import User
from models.database.vendor import Role, Vendor, VendorUser
from models.schema.marketplace_import_job import MarketplaceImportJobResponse
from models.schema.vendor import VendorCreate
logger = logging.getLogger(__name__)
class AdminService:
"""Service class for admin operations following the application's service pattern."""
# ============================================================================
# USER MANAGEMENT
# ============================================================================
def get_all_users(self, db: Session, skip: int = 0, limit: int = 100) -> list[User]:
"""Get paginated list of all users."""
try:
return db.query(User).offset(skip).limit(limit).all()
except Exception as e:
logger.error(f"Failed to retrieve users: {str(e)}")
raise AdminOperationException(
operation="get_all_users", reason="Database query failed"
)
def toggle_user_status(
self, db: Session, user_id: int, current_admin_id: int
) -> tuple[User, str]:
"""Toggle user active status."""
user = self._get_user_by_id_or_raise(db, user_id)
# Prevent self-modification
if user.id == current_admin_id:
raise CannotModifySelfException(user_id, "deactivate account")
# Check if user is another admin
if user.role == "admin" and user.id != current_admin_id:
raise UserStatusChangeException(
user_id=user_id,
current_status="admin",
attempted_action="toggle status",
reason="Cannot modify another admin user",
)
try:
original_status = user.is_active
user.is_active = not user.is_active
user.updated_at = datetime.now(UTC)
db.commit()
db.refresh(user)
status_action = "activated" if user.is_active else "deactivated"
message = f"User {user.username} has been {status_action}"
logger.info(f"{message} by admin {current_admin_id}")
return user, message
except Exception as e:
db.rollback()
logger.error(f"Failed to toggle user {user_id} status: {str(e)}")
raise UserStatusChangeException(
user_id=user_id,
current_status="active" if original_status else "inactive",
attempted_action="toggle status",
reason="Database update failed",
)
# ============================================================================
# VENDOR MANAGEMENT
# ============================================================================
def create_vendor(self, db: Session, vendor_data: VendorCreate) -> Vendor:
"""
Create a vendor (storefront/brand) under an existing company.
The vendor inherits owner and contact information from its parent company.
Args:
db: Database session
vendor_data: Vendor creation data including company_id
Returns:
The created Vendor object with company relationship loaded
Raises:
ValidationException: If company not found or vendor code/subdomain exists
AdminOperationException: If creation fails
"""
try:
# Validate company exists
company = db.query(Company).filter(Company.id == vendor_data.company_id).first()
if not company:
raise ValidationException(f"Company with ID {vendor_data.company_id} not found")
# Check if vendor code already exists
existing_vendor = (
db.query(Vendor)
.filter(func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper())
.first()
)
if existing_vendor:
raise VendorAlreadyExistsException(vendor_data.vendor_code)
# Check if subdomain already exists
existing_subdomain = (
db.query(Vendor)
.filter(func.lower(Vendor.subdomain) == vendor_data.subdomain.lower())
.first()
)
if existing_subdomain:
raise ValidationException(
f"Subdomain '{vendor_data.subdomain}' is already taken"
)
# Create vendor linked to company
vendor = Vendor(
company_id=company.id,
vendor_code=vendor_data.vendor_code.upper(),
subdomain=vendor_data.subdomain.lower(),
name=vendor_data.name,
description=vendor_data.description,
letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr,
letzshop_csv_url_en=vendor_data.letzshop_csv_url_en,
letzshop_csv_url_de=vendor_data.letzshop_csv_url_de,
is_active=True,
is_verified=False, # Needs verification by admin
)
db.add(vendor)
db.flush() # Get vendor.id
# Create default roles for vendor
self._create_default_roles(db, vendor.id)
db.commit()
db.refresh(vendor)
logger.info(
f"Vendor {vendor.vendor_code} created under company {company.name} (ID: {company.id})"
)
return vendor
except (VendorAlreadyExistsException, ValidationException):
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to create vendor: {str(e)}")
raise AdminOperationException(
operation="create_vendor",
reason=f"Failed to create vendor: {str(e)}",
)
def get_all_vendors(
self,
db: Session,
skip: int = 0,
limit: int = 100,
search: str | None = None,
is_active: bool | None = None,
is_verified: bool | None = None,
) -> tuple[list[Vendor], int]:
"""Get paginated list of all vendors with filtering."""
try:
# Eagerly load company relationship to avoid N+1 queries
query = db.query(Vendor).options(joinedload(Vendor.company))
# Apply search filter
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Vendor.name.ilike(search_term),
Vendor.vendor_code.ilike(search_term),
Vendor.subdomain.ilike(search_term),
)
)
# Apply status filters
if is_active is not None:
query = query.filter(Vendor.is_active == is_active)
if is_verified is not None:
query = query.filter(Vendor.is_verified == is_verified)
# Get total count (without joinedload for performance)
count_query = db.query(Vendor)
if search:
search_term = f"%{search}%"
count_query = count_query.filter(
or_(
Vendor.name.ilike(search_term),
Vendor.vendor_code.ilike(search_term),
Vendor.subdomain.ilike(search_term),
)
)
if is_active is not None:
count_query = count_query.filter(Vendor.is_active == is_active)
if is_verified is not None:
count_query = count_query.filter(Vendor.is_verified == is_verified)
total = count_query.count()
# Get paginated results
vendors = query.offset(skip).limit(limit).all()
return vendors, total
except Exception as e:
logger.error(f"Failed to retrieve vendors: {str(e)}")
raise AdminOperationException(
operation="get_all_vendors", reason="Database query failed"
)
def get_vendor_by_id(self, db: Session, vendor_id: int) -> Vendor:
"""Get vendor by ID."""
return self._get_vendor_by_id_or_raise(db, vendor_id)
def verify_vendor(self, db: Session, vendor_id: int) -> tuple[Vendor, str]:
"""Toggle vendor verification status."""
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
try:
original_status = vendor.is_verified
vendor.is_verified = not vendor.is_verified
vendor.updated_at = datetime.now(UTC)
if vendor.is_verified:
vendor.verified_at = datetime.now(UTC)
db.commit()
db.refresh(vendor)
status_action = "verified" if vendor.is_verified else "unverified"
message = f"Vendor {vendor.vendor_code} has been {status_action}"
logger.info(message)
return vendor, message
except Exception as e:
db.rollback()
logger.error(f"Failed to verify vendor {vendor_id}: {str(e)}")
raise VendorVerificationException(
vendor_id=vendor_id,
reason="Database update failed",
current_verification_status=original_status,
)
def toggle_vendor_status(self, db: Session, vendor_id: int) -> tuple[Vendor, str]:
"""Toggle vendor active status."""
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
try:
original_status = vendor.is_active
vendor.is_active = not vendor.is_active
vendor.updated_at = datetime.now(UTC)
db.commit()
db.refresh(vendor)
status_action = "activated" if vendor.is_active else "deactivated"
message = f"Vendor {vendor.vendor_code} has been {status_action}"
logger.info(message)
return vendor, message
except Exception as e:
db.rollback()
logger.error(f"Failed to toggle vendor {vendor_id} status: {str(e)}")
raise AdminOperationException(
operation="toggle_vendor_status",
reason="Database update failed",
target_type="vendor",
target_id=str(vendor_id),
)
def delete_vendor(self, db: Session, vendor_id: int) -> str:
"""Delete vendor and all associated data."""
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
try:
vendor_code = vendor.vendor_code
# TODO: Delete associated data in correct order
# - Delete orders
# - Delete customers
# - Delete products
# - Delete team members
# - Delete roles
# - Delete import jobs
db.delete(vendor)
db.commit()
logger.warning(f"Vendor {vendor_code} and all associated data deleted")
return f"Vendor {vendor_code} successfully deleted"
except Exception as e:
db.rollback()
logger.error(f"Failed to delete vendor {vendor_id}: {str(e)}")
raise AdminOperationException(
operation="delete_vendor", reason="Database deletion failed"
)
def update_vendor(
self,
db: Session,
vendor_id: int,
vendor_update, # VendorUpdate schema
) -> Vendor:
"""
Update vendor information (Admin only).
Can update:
- Vendor details (name, description, subdomain)
- Business contact info (contact_email, phone, etc.)
- Status (is_active, is_verified)
Cannot update:
- vendor_code (immutable)
- company_id (vendor cannot be moved between companies)
Note: Ownership is managed at the Company level.
Use company_service.transfer_ownership() for ownership changes.
Args:
db: Database session
vendor_id: ID of vendor to update
vendor_update: VendorUpdate schema with updated data
Returns:
Updated vendor object
Raises:
VendorNotFoundException: If vendor not found
ValidationException: If subdomain already taken
"""
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
try:
# Get update data
update_data = vendor_update.model_dump(exclude_unset=True)
# Handle reset_contact_to_company flag
if update_data.pop("reset_contact_to_company", False):
# Reset all contact fields to None (inherit from company)
update_data["contact_email"] = None
update_data["contact_phone"] = None
update_data["website"] = None
update_data["business_address"] = None
update_data["tax_number"] = None
# Convert empty strings to None for contact fields (empty = inherit)
contact_fields = ["contact_email", "contact_phone", "website", "business_address", "tax_number"]
for field in contact_fields:
if field in update_data and update_data[field] == "":
update_data[field] = None
# Check subdomain uniqueness if changing
if (
"subdomain" in update_data
and update_data["subdomain"] != vendor.subdomain
):
existing = (
db.query(Vendor)
.filter(
Vendor.subdomain == update_data["subdomain"],
Vendor.id != vendor_id,
)
.first()
)
if existing:
raise ValidationException(
f"Subdomain '{update_data['subdomain']}' is already taken"
)
# Update vendor fields
for field, value in update_data.items():
setattr(vendor, field, value)
vendor.updated_at = datetime.now(UTC)
db.commit()
db.refresh(vendor)
logger.info(
f"Vendor {vendor_id} ({vendor.vendor_code}) updated by admin. "
f"Fields updated: {', '.join(update_data.keys())}"
)
return vendor
except ValidationException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to update vendor {vendor_id}: {str(e)}")
raise AdminOperationException(
operation="update_vendor", reason=f"Database update failed: {str(e)}"
)
# NOTE: Vendor ownership transfer is now handled at the Company level.
# Use company_service.transfer_ownership() instead.
# ============================================================================
# MARKETPLACE IMPORT JOBS
# ============================================================================
def get_marketplace_import_jobs(
self,
db: Session,
marketplace: str | None = None,
vendor_name: str | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 100,
) -> list[MarketplaceImportJobResponse]:
"""Get filtered and paginated marketplace import jobs."""
try:
query = db.query(MarketplaceImportJob)
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if vendor_name:
query = query.filter(
MarketplaceImportJob.vendor_name.ilike(f"%{vendor_name}%")
)
if status:
query = query.filter(MarketplaceImportJob.status == status)
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return [self._convert_job_to_response(job) for job in jobs]
except Exception as e:
logger.error(f"Failed to retrieve marketplace import jobs: {str(e)}")
raise AdminOperationException(
operation="get_marketplace_import_jobs", reason="Database query failed"
)
# ============================================================================
# STATISTICS
# ============================================================================
def get_recent_vendors(self, db: Session, limit: int = 5) -> list[dict]:
"""Get recently created vendors."""
try:
vendors = (
db.query(Vendor).order_by(Vendor.created_at.desc()).limit(limit).all()
)
return [
{
"id": v.id,
"vendor_code": v.vendor_code,
"name": v.name,
"subdomain": v.subdomain,
"is_active": v.is_active,
"is_verified": v.is_verified,
"created_at": v.created_at,
}
for v in vendors
]
except Exception as e:
logger.error(f"Failed to get recent vendors: {str(e)}")
return []
def get_recent_import_jobs(self, db: Session, limit: int = 10) -> list[dict]:
"""Get recent marketplace import jobs."""
try:
jobs = (
db.query(MarketplaceImportJob)
.order_by(MarketplaceImportJob.created_at.desc())
.limit(limit)
.all()
)
return [
{
"id": j.id,
"marketplace": j.marketplace,
"vendor_name": j.vendor_name,
"status": j.status,
"total_processed": j.total_processed or 0,
"created_at": j.created_at,
}
for j in jobs
]
except Exception as e:
logger.error(f"Failed to get recent import jobs: {str(e)}")
return []
# ============================================================================
# PRIVATE HELPER METHODS
# ============================================================================
def _get_user_by_id_or_raise(self, db: Session, user_id: int) -> User:
"""Get user by ID or raise UserNotFoundException."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise UserNotFoundException(str(user_id))
return user
def _get_vendor_by_id_or_raise(self, db: Session, vendor_id: int) -> Vendor:
"""Get vendor by ID or raise VendorNotFoundException."""
vendor = (
db.query(Vendor)
.options(joinedload(Vendor.company).joinedload(Company.owner))
.filter(Vendor.id == vendor_id)
.first()
)
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
return vendor
def _generate_temp_password(self, length: int = 12) -> str:
"""Generate secure temporary password."""
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"
return "".join(secrets.choice(alphabet) for _ in range(length))
def _create_default_roles(self, db: Session, vendor_id: int):
"""Create default roles for a new vendor."""
default_roles = [
{"name": "Owner", "permissions": ["*"]}, # Full access
{
"name": "Manager",
"permissions": [
"products.*",
"orders.*",
"customers.view",
"inventory.*",
"team.view",
],
},
{
"name": "Editor",
"permissions": [
"products.view",
"products.edit",
"orders.view",
"inventory.view",
],
},
{
"name": "Viewer",
"permissions": [
"products.view",
"orders.view",
"customers.view",
"inventory.view",
],
},
]
for role_data in default_roles:
role = Role(
vendor_id=vendor_id,
name=role_data["name"],
permissions=role_data["permissions"],
)
db.add(role)
def _convert_job_to_response(
self, job: MarketplaceImportJob
) -> MarketplaceImportJobResponse:
"""Convert database model to response schema."""
return MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
vendor_id=job.vendor.id if job.vendor else None,
vendor_code=job.vendor.vendor_code if job.vendor else None,
vendor_name=job.vendor_name,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
)
# Create service instance
admin_service = AdminService()