refactor: switch to full auto-discovery for module API routes

- Enhanced route discovery system with ROUTE_CONFIG support for custom
  prefix, tags, and priority
- Added get_admin_api_routes() and get_vendor_api_routes() helpers that
  return routes sorted by priority
- Added fallback discovery for routes/{frontend}.py when routes/api/
  doesn't exist
- Updated CMS module with ROUTE_CONFIG (prefix: /content-pages,
  priority: 100) to register last for catch-all routes
- Moved customers routes from routes/ to routes/api/ directory
- Updated orders module to aggregate exception routers into main routers
- Removed manual module router imports from admin and vendor API init
  files, replaced with auto-discovery loop

Modules now auto-discovered: billing, inventory, orders, marketplace,
cms, customers, analytics, loyalty, messaging, monitoring, dev-tools

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 12:42:25 +01:00
parent 9fb3588030
commit db56b34894
14 changed files with 1230 additions and 155 deletions

View File

@@ -6,4 +6,20 @@ from app.modules.customers.routes.api.storefront import router as storefront_rou
# Tag for OpenAPI documentation
STOREFRONT_TAG = "Customer Account (Storefront)"
__all__ = ["storefront_router", "STOREFRONT_TAG"]
__all__ = [
"storefront_router",
"STOREFRONT_TAG",
"admin_router",
"vendor_router",
]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
if name == "admin_router":
from app.modules.customers.routes.api.admin import admin_router
return admin_router
elif name == "vendor_router":
from app.modules.customers.routes.api.vendor import vendor_router
return vendor_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,122 @@
# app/modules/customers/routes/api/admin.py
"""
Customer management endpoints for admin.
Provides admin-level access to customer data across all vendors.
"""
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api, require_module_access
from app.core.database import get_db
from app.modules.customers.services import admin_customer_service
from models.schema.auth import UserContext
from app.modules.customers.schemas import (
CustomerDetailResponse,
CustomerListResponse,
CustomerMessageResponse,
CustomerStatisticsResponse,
)
# Create module-aware router
admin_router = APIRouter(
prefix="/customers",
dependencies=[Depends(require_module_access("customers"))],
)
# ============================================================================
# List Customers
# ============================================================================
@admin_router.get("", response_model=CustomerListResponse)
def list_customers(
vendor_id: int | None = Query(None, description="Filter by vendor ID"),
search: str = Query("", description="Search by email, name, or customer number"),
is_active: bool | None = Query(None, description="Filter by active status"),
skip: int = Query(0, ge=0),
limit: int = Query(20, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> CustomerListResponse:
"""
Get paginated list of customers across all vendors.
Admin can filter by vendor, search, and active status.
"""
customers, total = admin_customer_service.list_customers(
db=db,
vendor_id=vendor_id,
search=search if search else None,
is_active=is_active,
skip=skip,
limit=limit,
)
# Calculate pagination values
page = (skip // limit) + 1 if limit > 0 else 1
per_page = limit
total_pages = (total + limit - 1) // limit if limit > 0 else 1
return CustomerListResponse(
customers=customers,
total=total,
page=page,
per_page=per_page,
total_pages=total_pages,
)
# ============================================================================
# Customer Statistics
# ============================================================================
@admin_router.get("/stats", response_model=CustomerStatisticsResponse)
def get_customer_stats(
vendor_id: int | None = Query(None, description="Filter by vendor ID"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> CustomerStatisticsResponse:
"""Get customer statistics."""
stats = admin_customer_service.get_customer_stats(db=db, vendor_id=vendor_id)
return CustomerStatisticsResponse(**stats)
# ============================================================================
# Get Single Customer
# ============================================================================
@admin_router.get("/{customer_id}", response_model=CustomerDetailResponse)
def get_customer(
customer_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> CustomerDetailResponse:
"""Get customer details by ID."""
customer = admin_customer_service.get_customer(db=db, customer_id=customer_id)
return CustomerDetailResponse(**customer)
# ============================================================================
# Toggle Customer Status
# ============================================================================
@admin_router.patch("/{customer_id}/toggle-status", response_model=CustomerMessageResponse)
def toggle_customer_status(
customer_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
) -> CustomerMessageResponse:
"""Toggle customer active status."""
result = admin_customer_service.toggle_customer_status(
db=db,
customer_id=customer_id,
admin_email=current_admin.email,
)
db.commit()
return CustomerMessageResponse(message=result["message"])

View File

@@ -0,0 +1,227 @@
# app/modules/customers/routes/api/vendor.py
"""
Vendor customer management endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
import logging
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.modules.customers.services import customer_service
from models.schema.auth import UserContext
from app.modules.customers.schemas import (
CustomerDetailResponse,
CustomerMessageResponse,
CustomerOrdersResponse,
CustomerResponse,
CustomerStatisticsResponse,
CustomerUpdate,
VendorCustomerListResponse,
)
# Create module-aware router
vendor_router = APIRouter(
prefix="/customers",
dependencies=[Depends(require_module_access("customers"))],
)
logger = logging.getLogger(__name__)
@vendor_router.get("", response_model=VendorCustomerListResponse)
def get_vendor_customers(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
search: str | None = Query(None),
is_active: bool | None = Query(None),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get all customers for this vendor.
- Query customers filtered by vendor_id
- Support search by name/email
- Support filtering by active status
- Return paginated results
"""
customers, total = customer_service.get_vendor_customers(
db=db,
vendor_id=current_user.token_vendor_id,
skip=skip,
limit=limit,
search=search,
is_active=is_active,
)
return VendorCustomerListResponse(
customers=[CustomerResponse.model_validate(c) for c in customers],
total=total,
skip=skip,
limit=limit,
)
@vendor_router.get("/{customer_id}", response_model=CustomerDetailResponse)
def get_customer_details(
customer_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get detailed customer information.
- Get customer by ID
- Verify customer belongs to vendor
- Include order statistics
"""
# Service will raise CustomerNotFoundException if not found
customer = customer_service.get_customer(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
)
# Get statistics
stats = customer_service.get_customer_statistics(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
)
return CustomerDetailResponse(
id=customer.id,
email=customer.email,
first_name=customer.first_name,
last_name=customer.last_name,
phone=customer.phone,
customer_number=customer.customer_number,
is_active=customer.is_active,
marketing_consent=customer.marketing_consent,
total_orders=stats["total_orders"],
total_spent=stats["total_spent"],
average_order_value=stats["average_order_value"],
last_order_date=stats["last_order_date"],
created_at=customer.created_at,
)
@vendor_router.get("/{customer_id}/orders", response_model=CustomerOrdersResponse)
def get_customer_orders(
customer_id: int,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get order history for a specific customer.
- Get all orders for customer
- Filter by vendor_id
- Return order details
"""
# Service will raise CustomerNotFoundException if not found
orders, total = customer_service.get_customer_orders(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
skip=skip,
limit=limit,
)
return CustomerOrdersResponse(
orders=[
{
"id": o.id,
"order_number": o.order_number,
"status": o.status,
"total": o.total_cents / 100 if o.total_cents else 0,
"created_at": o.created_at,
}
for o in orders
],
total=total,
skip=skip,
limit=limit,
)
@vendor_router.put("/{customer_id}", response_model=CustomerMessageResponse)
def update_customer(
customer_id: int,
customer_data: CustomerUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Update customer information.
- Update customer details
- Verify customer belongs to vendor
"""
# Service will raise CustomerNotFoundException if not found
customer_service.update_customer(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
customer_data=customer_data,
)
db.commit()
return CustomerMessageResponse(message="Customer updated successfully")
@vendor_router.put("/{customer_id}/status", response_model=CustomerMessageResponse)
def toggle_customer_status(
customer_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Activate/deactivate customer account.
- Toggle customer is_active status
- Verify customer belongs to vendor
"""
# Service will raise CustomerNotFoundException if not found
customer = customer_service.toggle_customer_status(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
)
db.commit()
status = "activated" if customer.is_active else "deactivated"
return CustomerMessageResponse(message=f"Customer {status} successfully")
@vendor_router.get("/{customer_id}/stats", response_model=CustomerStatisticsResponse)
def get_customer_statistics(
customer_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get customer statistics and metrics.
- Total orders
- Total spent
- Average order value
- Last order date
"""
# Service will raise CustomerNotFoundException if not found
stats = customer_service.get_customer_statistics(
db=db,
vendor_id=current_user.token_vendor_id,
customer_id=customer_id,
)
return CustomerStatisticsResponse(**stats)