refactor: migrate remaining routes to modules and enforce auto-discovery

MIGRATION:
- Delete app/api/v1/vendor/analytics.py (duplicate - analytics module already auto-discovered)
- Move usage routes from app/api/v1/vendor/usage.py to billing module
- Move onboarding routes from app/api/v1/vendor/onboarding.py to marketplace module
- Move features routes to billing module (admin + vendor)
- Move inventory routes to inventory module (admin + vendor)
- Move marketplace/letzshop routes to marketplace module
- Move orders routes to orders module
- Delete legacy letzshop service files (moved to marketplace module)

DOCUMENTATION:
- Add docs/development/migration/module-autodiscovery-migration.md with full migration history
- Update docs/architecture/module-system.md with Entity Auto-Discovery Reference section
- Add detailed sections for each entity type: routes, services, models, schemas, tasks,
  exceptions, templates, static files, locales, configuration

ARCHITECTURE VALIDATION:
- Add MOD-016: Routes must be in modules, not app/api/v1/
- Add MOD-017: Services must be in modules, not app/services/
- Add MOD-018: Tasks must be in modules, not app/tasks/
- Add MOD-019: Schemas must be in modules, not models/schema/
- Update scripts/validate_architecture.py with _validate_legacy_locations method
- Update .architecture-rules/module.yaml with legacy location rules

These rules enforce that all entities must be in self-contained modules.
Legacy locations now trigger ERROR severity violations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 14:25:59 +01:00
parent e2cecff014
commit 401db56258
52 changed files with 1160 additions and 4968 deletions

View File

@@ -3,8 +3,10 @@
Billing module API routes.
Provides REST API endpoints for subscription and billing management:
- Admin API: Subscription tier management, vendor subscriptions, billing history
- Vendor API: Subscription status, tier comparison, invoices
- Admin API: Subscription tier management, vendor subscriptions, billing history, features
- Vendor API: Subscription status, tier comparison, invoices, features
Each main router (admin.py, vendor.py) aggregates its related sub-routers internally.
"""
from app.modules.billing.routes.api.admin import admin_router

View File

@@ -334,3 +334,12 @@ def update_vendor_subscription(
products_count=usage["products_count"],
team_count=usage["team_count"],
)
# ============================================================================
# Aggregate Feature Management Routes
# ============================================================================
# Include the features router to aggregate all billing-related admin routes
from app.modules.billing.routes.api.admin_features import admin_features_router
admin_router.include_router(admin_features_router, tags=["admin-features"])

View File

@@ -0,0 +1,313 @@
# app/modules/billing/routes/api/admin_features.py
"""
Admin feature management endpoints.
Provides endpoints for:
- Listing all features with their tier assignments
- Updating tier feature assignments
- Managing feature metadata
- Viewing feature usage statistics
All routes require module access control for the 'billing' module.
"""
import logging
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api, require_module_access
from app.core.database import get_db
from app.services.feature_service import feature_service
from models.schema.auth import UserContext
admin_features_router = APIRouter(
prefix="/features",
dependencies=[Depends(require_module_access("billing"))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# Response Schemas
# ============================================================================
class FeatureResponse(BaseModel):
"""Feature information for admin."""
id: int
code: str
name: str
description: str | None = None
category: str
ui_location: str | None = None
ui_icon: str | None = None
ui_route: str | None = None
ui_badge_text: str | None = None
minimum_tier_id: int | None = None
minimum_tier_code: str | None = None
minimum_tier_name: str | None = None
is_active: bool
is_visible: bool
display_order: int
class FeatureListResponse(BaseModel):
"""List of features."""
features: list[FeatureResponse]
total: int
class TierFeaturesResponse(BaseModel):
"""Tier with its features."""
id: int
code: str
name: str
description: str | None = None
features: list[str]
feature_count: int
class TierListWithFeaturesResponse(BaseModel):
"""All tiers with their features."""
tiers: list[TierFeaturesResponse]
class UpdateTierFeaturesRequest(BaseModel):
"""Request to update tier features."""
feature_codes: list[str]
class UpdateFeatureRequest(BaseModel):
"""Request to update feature metadata."""
name: str | None = None
description: str | None = None
category: str | None = None
ui_location: str | None = None
ui_icon: str | None = None
ui_route: str | None = None
ui_badge_text: str | None = None
minimum_tier_code: str | None = None
is_active: bool | None = None
is_visible: bool | None = None
display_order: int | None = None
class CategoryListResponse(BaseModel):
"""List of feature categories."""
categories: list[str]
class TierFeatureDetailResponse(BaseModel):
"""Tier features with full details."""
tier_code: str
tier_name: str
features: list[dict]
feature_count: int
# ============================================================================
# Helper Functions
# ============================================================================
def _feature_to_response(feature) -> FeatureResponse:
"""Convert Feature model to response."""
return FeatureResponse(
id=feature.id,
code=feature.code,
name=feature.name,
description=feature.description,
category=feature.category,
ui_location=feature.ui_location,
ui_icon=feature.ui_icon,
ui_route=feature.ui_route,
ui_badge_text=feature.ui_badge_text,
minimum_tier_id=feature.minimum_tier_id,
minimum_tier_code=feature.minimum_tier.code if feature.minimum_tier else None,
minimum_tier_name=feature.minimum_tier.name if feature.minimum_tier else None,
is_active=feature.is_active,
is_visible=feature.is_visible,
display_order=feature.display_order,
)
# ============================================================================
# Endpoints
# ============================================================================
@admin_features_router.get("", response_model=FeatureListResponse)
def list_features(
category: str | None = Query(None, description="Filter by category"),
active_only: bool = Query(False, description="Only active features"),
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""List all features with their tier assignments."""
features = feature_service.get_all_features(
db, category=category, active_only=active_only
)
return FeatureListResponse(
features=[_feature_to_response(f) for f in features],
total=len(features),
)
@admin_features_router.get("/categories", response_model=CategoryListResponse)
def list_categories(
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""List all feature categories."""
categories = feature_service.get_categories(db)
return CategoryListResponse(categories=categories)
@admin_features_router.get("/tiers", response_model=TierListWithFeaturesResponse)
def list_tiers_with_features(
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""List all tiers with their feature assignments."""
tiers = feature_service.get_all_tiers_with_features(db)
return TierListWithFeaturesResponse(
tiers=[
TierFeaturesResponse(
id=t.id,
code=t.code,
name=t.name,
description=t.description,
features=t.features or [],
feature_count=len(t.features or []),
)
for t in tiers
]
)
@admin_features_router.get("/{feature_code}", response_model=FeatureResponse)
def get_feature(
feature_code: str,
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""
Get a single feature by code.
Raises 404 if feature not found.
"""
feature = feature_service.get_feature_by_code(db, feature_code)
if not feature:
from app.exceptions import FeatureNotFoundError
raise FeatureNotFoundError(feature_code)
return _feature_to_response(feature)
@admin_features_router.put("/{feature_code}", response_model=FeatureResponse)
def update_feature(
feature_code: str,
request: UpdateFeatureRequest,
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""
Update feature metadata.
Raises 404 if feature not found, 400 if tier code is invalid.
"""
feature = feature_service.update_feature(
db,
feature_code,
name=request.name,
description=request.description,
category=request.category,
ui_location=request.ui_location,
ui_icon=request.ui_icon,
ui_route=request.ui_route,
ui_badge_text=request.ui_badge_text,
minimum_tier_code=request.minimum_tier_code,
is_active=request.is_active,
is_visible=request.is_visible,
display_order=request.display_order,
)
db.commit()
db.refresh(feature)
logger.info(f"Updated feature {feature_code} by admin {current_user.id}")
return _feature_to_response(feature)
@admin_features_router.put("/tiers/{tier_code}/features", response_model=TierFeaturesResponse)
def update_tier_features(
tier_code: str,
request: UpdateTierFeaturesRequest,
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""
Update features assigned to a tier.
Raises 404 if tier not found, 422 if any feature codes are invalid.
"""
tier = feature_service.update_tier_features(db, tier_code, request.feature_codes)
db.commit()
logger.info(
f"Updated tier {tier_code} features to {len(request.feature_codes)} features "
f"by admin {current_user.id}"
)
return TierFeaturesResponse(
id=tier.id,
code=tier.code,
name=tier.name,
description=tier.description,
features=tier.features or [],
feature_count=len(tier.features or []),
)
@admin_features_router.get("/tiers/{tier_code}/features", response_model=TierFeatureDetailResponse)
def get_tier_features(
tier_code: str,
current_user: UserContext = Depends(get_current_admin_api),
db: Session = Depends(get_db),
):
"""
Get features assigned to a specific tier with full details.
Raises 404 if tier not found.
"""
tier, features = feature_service.get_tier_features_with_details(db, tier_code)
return TierFeatureDetailResponse(
tier_code=tier.code,
tier_name=tier.name,
features=[
{
"code": f.code,
"name": f.name,
"category": f.category,
"description": f.description,
}
for f in features
],
feature_count=len(features),
)

View File

@@ -223,7 +223,9 @@ def get_invoices(
from app.modules.billing.routes.api.vendor_features import vendor_features_router
from app.modules.billing.routes.api.vendor_checkout import vendor_checkout_router
from app.modules.billing.routes.api.vendor_addons import vendor_addons_router
from app.modules.billing.routes.api.vendor_usage import vendor_usage_router
vendor_router.include_router(vendor_features_router, tags=["vendor-features"])
vendor_router.include_router(vendor_checkout_router, tags=["vendor-billing"])
vendor_router.include_router(vendor_addons_router, tags=["vendor-billing-addons"])
vendor_router.include_router(vendor_usage_router, tags=["vendor-usage"])

View File

@@ -0,0 +1,353 @@
# app/modules/billing/routes/api/vendor_features.py
"""
Vendor features API endpoints.
Provides feature availability information for the frontend to:
- Show/hide UI elements based on tier
- Display upgrade prompts for unavailable features
- Load feature metadata for dynamic rendering
Endpoints:
- GET /features/available - List of feature codes (for quick checks)
- GET /features - Full feature list with availability and metadata
- GET /features/{code} - Single feature details with upgrade info
- GET /features/categories - List feature categories
All routes require module access control for the 'billing' module.
"""
import logging
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.exceptions import FeatureNotFoundError
from app.services.feature_service import feature_service
from models.schema.auth import UserContext
vendor_features_router = APIRouter(
prefix="/features",
dependencies=[Depends(require_module_access("billing"))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# Response Schemas
# ============================================================================
class FeatureCodeListResponse(BaseModel):
"""Simple list of available feature codes for quick checks."""
features: list[str]
tier_code: str
tier_name: str
class FeatureResponse(BaseModel):
"""Full feature information."""
code: str
name: str
description: str | None = None
category: str
ui_location: str | None = None
ui_icon: str | None = None
ui_route: str | None = None
ui_badge_text: str | None = None
is_available: bool
minimum_tier_code: str | None = None
minimum_tier_name: str | None = None
class FeatureListResponse(BaseModel):
"""List of features with metadata."""
features: list[FeatureResponse]
available_count: int
total_count: int
tier_code: str
tier_name: str
class FeatureDetailResponse(BaseModel):
"""Single feature detail with upgrade info."""
code: str
name: str
description: str | None = None
category: str
ui_location: str | None = None
ui_icon: str | None = None
ui_route: str | None = None
is_available: bool
# Upgrade info (only if not available)
upgrade_tier_code: str | None = None
upgrade_tier_name: str | None = None
upgrade_tier_price_monthly_cents: int | None = None
class CategoryListResponse(BaseModel):
"""List of feature categories."""
categories: list[str]
class FeatureGroupedResponse(BaseModel):
"""Features grouped by category."""
categories: dict[str, list[FeatureResponse]]
available_count: int
total_count: int
class FeatureCheckResponse(BaseModel):
"""Quick feature availability check response."""
has_feature: bool
feature_code: str
# ============================================================================
# Endpoints
# ============================================================================
@vendor_features_router.get("/available", response_model=FeatureCodeListResponse)
def get_available_features(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get list of feature codes available to vendor.
This is a lightweight endpoint for quick feature checks.
Use this to populate a frontend feature store on app init.
Returns:
List of feature codes the vendor has access to
"""
vendor_id = current_user.token_vendor_id
# Get subscription for tier info
from app.services.subscription_service import subscription_service
subscription = subscription_service.get_or_create_subscription(db, vendor_id)
tier = subscription.tier_obj
# Get available features
feature_codes = feature_service.get_available_feature_codes(db, vendor_id)
return FeatureCodeListResponse(
features=feature_codes,
tier_code=subscription.tier,
tier_name=tier.name if tier else subscription.tier.title(),
)
@vendor_features_router.get("", response_model=FeatureListResponse)
def get_features(
category: str | None = Query(None, description="Filter by category"),
include_unavailable: bool = Query(True, description="Include features not available to vendor"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get all features with availability status and metadata.
This is a comprehensive endpoint for building feature-gated UIs.
Each feature includes:
- Availability status
- UI metadata (icon, route, location)
- Minimum tier required
Args:
category: Filter to specific category (orders, inventory, etc.)
include_unavailable: Whether to include locked features
Returns:
List of features with metadata and availability
"""
vendor_id = current_user.token_vendor_id
# Get subscription for tier info
from app.services.subscription_service import subscription_service
subscription = subscription_service.get_or_create_subscription(db, vendor_id)
tier = subscription.tier_obj
# Get features
features = feature_service.get_vendor_features(
db,
vendor_id,
category=category,
include_unavailable=include_unavailable,
)
available_count = sum(1 for f in features if f.is_available)
return FeatureListResponse(
features=[
FeatureResponse(
code=f.code,
name=f.name,
description=f.description,
category=f.category,
ui_location=f.ui_location,
ui_icon=f.ui_icon,
ui_route=f.ui_route,
ui_badge_text=f.ui_badge_text,
is_available=f.is_available,
minimum_tier_code=f.minimum_tier_code,
minimum_tier_name=f.minimum_tier_name,
)
for f in features
],
available_count=available_count,
total_count=len(features),
tier_code=subscription.tier,
tier_name=tier.name if tier else subscription.tier.title(),
)
@vendor_features_router.get("/categories", response_model=CategoryListResponse)
def get_feature_categories(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get list of feature categories.
Returns:
List of category names
"""
categories = feature_service.get_categories(db)
return CategoryListResponse(categories=categories)
@vendor_features_router.get("/grouped", response_model=FeatureGroupedResponse)
def get_features_grouped(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get features grouped by category.
Useful for rendering feature comparison tables or settings pages.
"""
vendor_id = current_user.token_vendor_id
grouped = feature_service.get_features_grouped_by_category(db, vendor_id)
# Convert to response format
categories_response = {}
total = 0
available = 0
for category, features in grouped.items():
categories_response[category] = [
FeatureResponse(
code=f.code,
name=f.name,
description=f.description,
category=f.category,
ui_location=f.ui_location,
ui_icon=f.ui_icon,
ui_route=f.ui_route,
ui_badge_text=f.ui_badge_text,
is_available=f.is_available,
minimum_tier_code=f.minimum_tier_code,
minimum_tier_name=f.minimum_tier_name,
)
for f in features
]
total += len(features)
available += sum(1 for f in features if f.is_available)
return FeatureGroupedResponse(
categories=categories_response,
available_count=available,
total_count=total,
)
@vendor_features_router.get("/{feature_code}", response_model=FeatureDetailResponse)
def get_feature_detail(
feature_code: str,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get detailed information about a specific feature.
Includes upgrade information if the feature is not available.
Use this for upgrade prompts and feature explanation modals.
Args:
feature_code: The feature code
Returns:
Feature details with upgrade info if locked
"""
vendor_id = current_user.token_vendor_id
# Get feature
feature = feature_service.get_feature_by_code(db, feature_code)
if not feature:
raise FeatureNotFoundError(feature_code)
# Check availability
is_available = feature_service.has_feature(db, vendor_id, feature_code)
# Get upgrade info if not available
upgrade_tier_code = None
upgrade_tier_name = None
upgrade_tier_price = None
if not is_available:
upgrade_info = feature_service.get_feature_upgrade_info(db, feature_code)
if upgrade_info:
upgrade_tier_code = upgrade_info.required_tier_code
upgrade_tier_name = upgrade_info.required_tier_name
upgrade_tier_price = upgrade_info.required_tier_price_monthly_cents
return FeatureDetailResponse(
code=feature.code,
name=feature.name,
description=feature.description,
category=feature.category,
ui_location=feature.ui_location,
ui_icon=feature.ui_icon,
ui_route=feature.ui_route,
is_available=is_available,
upgrade_tier_code=upgrade_tier_code,
upgrade_tier_name=upgrade_tier_name,
upgrade_tier_price_monthly_cents=upgrade_tier_price,
)
@vendor_features_router.get("/check/{feature_code}", response_model=FeatureCheckResponse)
def check_feature(
feature_code: str,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Quick check if vendor has access to a feature.
Returns simple boolean response for inline checks.
Args:
feature_code: The feature code
Returns:
has_feature and feature_code
"""
vendor_id = current_user.token_vendor_id
has_feature = feature_service.has_feature(db, vendor_id, feature_code)
return FeatureCheckResponse(has_feature=has_feature, feature_code=feature_code)

View File

@@ -0,0 +1,182 @@
# app/modules/billing/routes/api/vendor_usage.py
"""
Vendor usage and limits API endpoints.
Provides endpoints for:
- Current usage vs limits
- Upgrade recommendations
- Approaching limit warnings
Migrated from app/api/v1/vendor/usage.py to billing module.
"""
import logging
from fastapi import APIRouter, Depends
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.services.usage_service import usage_service
from models.schema.auth import UserContext
vendor_usage_router = APIRouter(
prefix="/usage",
dependencies=[Depends(require_module_access("billing"))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# Response Schemas
# ============================================================================
class UsageMetric(BaseModel):
"""Single usage metric."""
name: str
current: int
limit: int | None # None = unlimited
percentage: float # 0-100, or 0 if unlimited
is_unlimited: bool
is_at_limit: bool
is_approaching_limit: bool # >= 80%
class TierInfo(BaseModel):
"""Current tier information."""
code: str
name: str
price_monthly_cents: int
is_highest_tier: bool
class UpgradeTierInfo(BaseModel):
"""Next tier upgrade information."""
code: str
name: str
price_monthly_cents: int
price_increase_cents: int
benefits: list[str]
class UsageResponse(BaseModel):
"""Full usage response with limits and upgrade info."""
tier: TierInfo
usage: list[UsageMetric]
has_limits_approaching: bool
has_limits_reached: bool
upgrade_available: bool
upgrade_tier: UpgradeTierInfo | None = None
upgrade_reasons: list[str]
class LimitCheckResponse(BaseModel):
"""Response for checking a specific limit."""
limit_type: str
can_proceed: bool
current: int
limit: int | None
percentage: float
message: str | None = None
upgrade_tier_code: str | None = None
upgrade_tier_name: str | None = None
# ============================================================================
# Endpoints
# ============================================================================
@vendor_usage_router.get("", response_model=UsageResponse)
def get_usage(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get current usage, limits, and upgrade recommendations.
Returns comprehensive usage info for displaying in dashboard
and determining when to show upgrade prompts.
"""
vendor_id = current_user.token_vendor_id
# Get usage data from service
usage_data = usage_service.get_vendor_usage(db, vendor_id)
# Convert to response
return UsageResponse(
tier=TierInfo(
code=usage_data.tier.code,
name=usage_data.tier.name,
price_monthly_cents=usage_data.tier.price_monthly_cents,
is_highest_tier=usage_data.tier.is_highest_tier,
),
usage=[
UsageMetric(
name=m.name,
current=m.current,
limit=m.limit,
percentage=m.percentage,
is_unlimited=m.is_unlimited,
is_at_limit=m.is_at_limit,
is_approaching_limit=m.is_approaching_limit,
)
for m in usage_data.usage
],
has_limits_approaching=usage_data.has_limits_approaching,
has_limits_reached=usage_data.has_limits_reached,
upgrade_available=usage_data.upgrade_available,
upgrade_tier=(
UpgradeTierInfo(
code=usage_data.upgrade_tier.code,
name=usage_data.upgrade_tier.name,
price_monthly_cents=usage_data.upgrade_tier.price_monthly_cents,
price_increase_cents=usage_data.upgrade_tier.price_increase_cents,
benefits=usage_data.upgrade_tier.benefits,
)
if usage_data.upgrade_tier
else None
),
upgrade_reasons=usage_data.upgrade_reasons,
)
@vendor_usage_router.get("/check/{limit_type}", response_model=LimitCheckResponse)
def check_limit(
limit_type: str,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Check a specific limit before performing an action.
Use this before creating orders, products, or inviting team members.
Args:
limit_type: One of "orders", "products", "team_members"
Returns:
Whether the action can proceed and upgrade info if not
"""
vendor_id = current_user.token_vendor_id
# Check limit using service
check_data = usage_service.check_limit(db, vendor_id, limit_type)
return LimitCheckResponse(
limit_type=check_data.limit_type,
can_proceed=check_data.can_proceed,
current=check_data.current,
limit=check_data.limit,
percentage=check_data.percentage,
message=check_data.message,
upgrade_tier_code=check_data.upgrade_tier_code,
upgrade_tier_name=check_data.upgrade_tier_name,
)

View File

@@ -6,23 +6,20 @@ This module provides functions to register inventory routes
with module-based access control.
NOTE: Routers are NOT auto-imported to avoid circular dependencies.
Import directly from admin.py or vendor.py as needed:
from app.modules.inventory.routes.admin import admin_router
from app.modules.inventory.routes.vendor import vendor_router
Import directly from api submodule as needed:
from app.modules.inventory.routes.api import admin_router
from app.modules.inventory.routes.api import vendor_router
"""
# Routers are imported on-demand to avoid circular dependencies
# Do NOT add auto-imports here
__all__ = ["admin_router", "vendor_router"]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
if name == "admin_router":
from app.modules.inventory.routes.admin import admin_router
from app.modules.inventory.routes.api import admin_router
return admin_router
elif name == "vendor_router":
from app.modules.inventory.routes.vendor import vendor_router
from app.modules.inventory.routes.api import vendor_router
return vendor_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -1,26 +0,0 @@
# app/modules/inventory/routes/admin.py
"""
Inventory module admin routes.
This module wraps the existing admin inventory routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
"""
from fastapi import APIRouter, Depends
from app.api.deps import require_module_access
# Import original router (direct import to avoid circular dependency)
from app.api.v1.admin.inventory import router as original_router
# Create module-aware router
admin_router = APIRouter(
prefix="/inventory",
dependencies=[Depends(require_module_access("inventory"))],
)
# Re-export all routes from the original module with module access control
# The routes are copied to maintain the same API structure
for route in original_router.routes:
admin_router.routes.append(route)

View File

@@ -1,4 +1,21 @@
# Routes will be migrated here from legacy locations
# TODO: Move actual route implementations from app/api/v1/
# app/modules/inventory/routes/api/__init__.py
"""
Inventory module API routes.
__all__ = []
Provides REST API endpoints for inventory management:
- Admin API: Platform-wide inventory management
- Vendor API: Vendor-specific inventory operations
"""
__all__ = ["admin_router", "vendor_router"]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
if name == "admin_router":
from app.modules.inventory.routes.api.admin import admin_router
return admin_router
elif name == "vendor_router":
from app.modules.inventory.routes.api.vendor import vendor_router
return vendor_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -0,0 +1,436 @@
# app/modules/inventory/routes/api/admin.py
"""
Admin inventory management endpoints.
Provides inventory management capabilities for administrators:
- View inventory across all vendors
- View vendor-specific inventory
- Set/adjust inventory on behalf of vendors
- Low stock alerts and reporting
Admin Context: Uses admin JWT authentication.
Vendor selection is passed as a request parameter.
"""
import logging
from fastapi import APIRouter, Depends, File, Form, Query, UploadFile
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api, require_module_access
from app.core.database import get_db
from app.services.inventory_import_service import inventory_import_service
from app.services.inventory_service import inventory_service
from app.services.inventory_transaction_service import inventory_transaction_service
from models.schema.auth import UserContext
from app.modules.inventory.schemas import (
AdminInventoryAdjust,
AdminInventoryCreate,
AdminInventoryListResponse,
AdminInventoryLocationsResponse,
AdminInventoryStats,
AdminInventoryTransactionItem,
AdminInventoryTransactionListResponse,
AdminLowStockItem,
AdminTransactionStatsResponse,
AdminVendorsWithInventoryResponse,
InventoryAdjust,
InventoryCreate,
InventoryMessageResponse,
InventoryResponse,
InventoryUpdate,
ProductInventorySummary,
)
admin_router = APIRouter(
prefix="/inventory",
dependencies=[Depends(require_module_access("inventory"))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# List & Statistics Endpoints
# ============================================================================
@admin_router.get("", response_model=AdminInventoryListResponse)
def get_all_inventory(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
vendor_id: int | None = Query(None, description="Filter by vendor"),
location: str | None = Query(None, description="Filter by location"),
low_stock: int | None = Query(None, ge=0, description="Filter items below threshold"),
search: str | None = Query(None, description="Search by product title or SKU"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Get inventory across all vendors with filtering.
Allows admins to view and filter inventory across the platform.
"""
return inventory_service.get_all_inventory_admin(
db=db,
skip=skip,
limit=limit,
vendor_id=vendor_id,
location=location,
low_stock=low_stock,
search=search,
)
@admin_router.get("/stats", response_model=AdminInventoryStats)
def get_inventory_stats(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get platform-wide inventory statistics."""
return inventory_service.get_inventory_stats_admin(db)
@admin_router.get("/low-stock", response_model=list[AdminLowStockItem])
def get_low_stock_items(
threshold: int = Query(10, ge=0, description="Stock threshold"),
vendor_id: int | None = Query(None, description="Filter by vendor"),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get items with low stock levels."""
return inventory_service.get_low_stock_items_admin(
db=db,
threshold=threshold,
vendor_id=vendor_id,
limit=limit,
)
@admin_router.get("/vendors", response_model=AdminVendorsWithInventoryResponse)
def get_vendors_with_inventory(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get list of vendors that have inventory entries."""
return inventory_service.get_vendors_with_inventory_admin(db)
@admin_router.get("/locations", response_model=AdminInventoryLocationsResponse)
def get_inventory_locations(
vendor_id: int | None = Query(None, description="Filter by vendor"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get list of unique inventory locations."""
return inventory_service.get_inventory_locations_admin(db, vendor_id)
# ============================================================================
# Vendor-Specific Endpoints
# ============================================================================
@admin_router.get("/vendors/{vendor_id}", response_model=AdminInventoryListResponse)
def get_vendor_inventory(
vendor_id: int,
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
location: str | None = Query(None, description="Filter by location"),
low_stock: int | None = Query(None, ge=0, description="Filter items below threshold"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get inventory for a specific vendor."""
return inventory_service.get_vendor_inventory_admin(
db=db,
vendor_id=vendor_id,
skip=skip,
limit=limit,
location=location,
low_stock=low_stock,
)
@admin_router.get("/products/{product_id}", response_model=ProductInventorySummary)
def get_product_inventory(
product_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get inventory summary for a specific product across all locations."""
return inventory_service.get_product_inventory_admin(db, product_id)
# ============================================================================
# Inventory Modification Endpoints
# ============================================================================
@admin_router.post("/set", response_model=InventoryResponse)
def set_inventory(
inventory_data: AdminInventoryCreate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Set exact inventory quantity for a product at a location.
Admin version - requires explicit vendor_id in request body.
"""
# Verify vendor exists
inventory_service.verify_vendor_exists(db, inventory_data.vendor_id)
# Convert to standard schema for service
service_data = InventoryCreate(
product_id=inventory_data.product_id,
location=inventory_data.location,
quantity=inventory_data.quantity,
)
result = inventory_service.set_inventory(
db=db,
vendor_id=inventory_data.vendor_id,
inventory_data=service_data,
)
logger.info(
f"Admin {current_admin.email} set inventory for product {inventory_data.product_id} "
f"at {inventory_data.location}: {inventory_data.quantity} units"
)
db.commit()
return result
@admin_router.post("/adjust", response_model=InventoryResponse)
def adjust_inventory(
adjustment: AdminInventoryAdjust,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Adjust inventory by adding or removing quantity.
Positive quantity = add stock, negative = remove stock.
Admin version - requires explicit vendor_id in request body.
"""
# Verify vendor exists
inventory_service.verify_vendor_exists(db, adjustment.vendor_id)
# Convert to standard schema for service
service_data = InventoryAdjust(
product_id=adjustment.product_id,
location=adjustment.location,
quantity=adjustment.quantity,
)
result = inventory_service.adjust_inventory(
db=db,
vendor_id=adjustment.vendor_id,
inventory_data=service_data,
)
sign = "+" if adjustment.quantity >= 0 else ""
logger.info(
f"Admin {current_admin.email} adjusted inventory for product {adjustment.product_id} "
f"at {adjustment.location}: {sign}{adjustment.quantity} units"
f"{f' (reason: {adjustment.reason})' if adjustment.reason else ''}"
)
db.commit()
return result
@admin_router.put("/{inventory_id}", response_model=InventoryResponse)
def update_inventory(
inventory_id: int,
inventory_update: InventoryUpdate,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Update inventory entry fields."""
# Get inventory to find vendor_id
inventory = inventory_service.get_inventory_by_id_admin(db, inventory_id)
result = inventory_service.update_inventory(
db=db,
vendor_id=inventory.vendor_id,
inventory_id=inventory_id,
inventory_update=inventory_update,
)
logger.info(f"Admin {current_admin.email} updated inventory {inventory_id}")
db.commit()
return result
@admin_router.delete("/{inventory_id}", response_model=InventoryMessageResponse)
def delete_inventory(
inventory_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Delete inventory entry."""
# Get inventory to find vendor_id and log details
inventory = inventory_service.get_inventory_by_id_admin(db, inventory_id)
vendor_id = inventory.vendor_id
product_id = inventory.product_id
location = inventory.location
inventory_service.delete_inventory(
db=db,
vendor_id=vendor_id,
inventory_id=inventory_id,
)
logger.info(
f"Admin {current_admin.email} deleted inventory {inventory_id} "
f"(product {product_id} at {location})"
)
db.commit()
return InventoryMessageResponse(message="Inventory deleted successfully")
# ============================================================================
# Import Endpoints
# ============================================================================
class UnmatchedGtin(BaseModel):
"""GTIN that couldn't be matched to a product."""
gtin: str
quantity: int
product_name: str
class InventoryImportResponse(BaseModel):
"""Response from inventory import."""
success: bool
total_rows: int
entries_created: int
entries_updated: int
quantity_imported: int
unmatched_gtins: list[UnmatchedGtin]
errors: list[str]
@admin_router.post("/import", response_model=InventoryImportResponse)
async def import_inventory(
file: UploadFile = File(..., description="TSV/CSV file with BIN, EAN, PRODUCT, QUANTITY columns"),
vendor_id: int = Form(..., description="Vendor ID"),
warehouse: str = Form("strassen", description="Warehouse name"),
clear_existing: bool = Form(False, description="Clear existing inventory before import"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Import inventory from a TSV/CSV file.
File format (TSV recommended):
- Required columns: BIN, EAN
- Optional columns: PRODUCT (for display), QUANTITY (defaults to 1 per row)
If QUANTITY column is present, each row represents the quantity specified.
If QUANTITY is absent, each row counts as 1 unit (rows with same EAN+BIN are summed).
Products are matched by GTIN/EAN. Unmatched GTINs are reported in the response.
"""
# Verify vendor exists
inventory_service.verify_vendor_exists(db, vendor_id)
# Read file content
content = await file.read()
try:
content_str = content.decode("utf-8")
except UnicodeDecodeError:
content_str = content.decode("latin-1")
# Detect delimiter
first_line = content_str.split("\n")[0] if content_str else ""
delimiter = "\t" if "\t" in first_line else ","
# Run import
result = inventory_import_service.import_from_text(
db=db,
content=content_str,
vendor_id=vendor_id,
warehouse=warehouse,
delimiter=delimiter,
clear_existing=clear_existing,
)
if result.success:
db.commit()
logger.info(
f"Admin {current_admin.email} imported inventory: "
f"{result.entries_created} created, {result.entries_updated} updated, "
f"{result.quantity_imported} total units"
)
else:
db.rollback()
logger.error(f"Inventory import failed: {result.errors}")
return InventoryImportResponse(
success=result.success,
total_rows=result.total_rows,
entries_created=result.entries_created,
entries_updated=result.entries_updated,
quantity_imported=result.quantity_imported,
unmatched_gtins=[UnmatchedGtin(**g) for g in result.unmatched_gtins],
errors=result.errors,
)
# ============================================================================
# Transaction History Endpoints
# ============================================================================
@admin_router.get("/transactions", response_model=AdminInventoryTransactionListResponse)
def get_all_transactions(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
vendor_id: int | None = Query(None, description="Filter by vendor"),
product_id: int | None = Query(None, description="Filter by product"),
transaction_type: str | None = Query(None, description="Filter by type"),
order_id: int | None = Query(None, description="Filter by order"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Get inventory transaction history across all vendors.
Returns a paginated list of all stock movements with vendor and product details.
"""
transactions, total = inventory_transaction_service.get_all_transactions_admin(
db=db,
skip=skip,
limit=limit,
vendor_id=vendor_id,
product_id=product_id,
transaction_type=transaction_type,
order_id=order_id,
)
return AdminInventoryTransactionListResponse(
transactions=[AdminInventoryTransactionItem(**tx) for tx in transactions],
total=total,
skip=skip,
limit=limit,
)
@admin_router.get("/transactions/stats", response_model=AdminTransactionStatsResponse)
def get_transaction_stats(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get transaction statistics for the platform."""
stats = inventory_transaction_service.get_transaction_stats_admin(db)
return AdminTransactionStatsResponse(**stats)

View File

@@ -0,0 +1,260 @@
# app/modules/inventory/routes/api/vendor.py
"""
Vendor inventory management endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
import logging
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.services.inventory_service import inventory_service
from app.services.inventory_transaction_service import inventory_transaction_service
from models.schema.auth import UserContext
from app.modules.inventory.schemas import (
InventoryAdjust,
InventoryCreate,
InventoryListResponse,
InventoryMessageResponse,
InventoryReserve,
InventoryResponse,
InventoryTransactionListResponse,
InventoryTransactionWithProduct,
InventoryUpdate,
OrderTransactionHistoryResponse,
ProductInventorySummary,
ProductTransactionHistoryResponse,
)
vendor_router = APIRouter(
prefix="/inventory",
dependencies=[Depends(require_module_access("inventory"))],
)
logger = logging.getLogger(__name__)
@vendor_router.post("/set", response_model=InventoryResponse)
def set_inventory(
inventory: InventoryCreate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Set exact inventory quantity (replaces existing)."""
result = inventory_service.set_inventory(
db, current_user.token_vendor_id, inventory
)
db.commit()
return result
@vendor_router.post("/adjust", response_model=InventoryResponse)
def adjust_inventory(
adjustment: InventoryAdjust,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Adjust inventory (positive to add, negative to remove)."""
result = inventory_service.adjust_inventory(
db, current_user.token_vendor_id, adjustment
)
db.commit()
return result
@vendor_router.post("/reserve", response_model=InventoryResponse)
def reserve_inventory(
reservation: InventoryReserve,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Reserve inventory for an order."""
result = inventory_service.reserve_inventory(
db, current_user.token_vendor_id, reservation
)
db.commit()
return result
@vendor_router.post("/release", response_model=InventoryResponse)
def release_reservation(
reservation: InventoryReserve,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Release reserved inventory (cancel order)."""
result = inventory_service.release_reservation(
db, current_user.token_vendor_id, reservation
)
db.commit()
return result
@vendor_router.post("/fulfill", response_model=InventoryResponse)
def fulfill_reservation(
reservation: InventoryReserve,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Fulfill reservation (complete order, remove from stock)."""
result = inventory_service.fulfill_reservation(
db, current_user.token_vendor_id, reservation
)
db.commit()
return result
@vendor_router.get("/product/{product_id}", response_model=ProductInventorySummary)
def get_product_inventory(
product_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get inventory summary for a product."""
return inventory_service.get_product_inventory(
db, current_user.token_vendor_id, product_id
)
@vendor_router.get("", response_model=InventoryListResponse)
def get_vendor_inventory(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
location: str | None = Query(None),
low_stock: int | None = Query(None, ge=0),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get all inventory for vendor."""
inventories = inventory_service.get_vendor_inventory(
db, current_user.token_vendor_id, skip, limit, location, low_stock
)
# Get total count
total = len(inventories) # You might want a separate count query for large datasets
return InventoryListResponse(
inventories=inventories, total=total, skip=skip, limit=limit
)
@vendor_router.put("/{inventory_id}", response_model=InventoryResponse)
def update_inventory(
inventory_id: int,
inventory_update: InventoryUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Update inventory entry."""
result = inventory_service.update_inventory(
db, current_user.token_vendor_id, inventory_id, inventory_update
)
db.commit()
return result
@vendor_router.delete("/{inventory_id}", response_model=InventoryMessageResponse)
def delete_inventory(
inventory_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Delete inventory entry."""
inventory_service.delete_inventory(db, current_user.token_vendor_id, inventory_id)
db.commit()
return InventoryMessageResponse(message="Inventory deleted successfully")
# ============================================================================
# Inventory Transaction History Endpoints
# ============================================================================
@vendor_router.get("/transactions", response_model=InventoryTransactionListResponse)
def get_inventory_transactions(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
product_id: int | None = Query(None, description="Filter by product"),
transaction_type: str | None = Query(None, description="Filter by type"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get inventory transaction history for the vendor.
Returns a paginated list of all stock movements with product details.
Use filters to narrow down by product or transaction type.
"""
transactions, total = inventory_transaction_service.get_vendor_transactions(
db=db,
vendor_id=current_user.token_vendor_id,
skip=skip,
limit=limit,
product_id=product_id,
transaction_type=transaction_type,
)
return InventoryTransactionListResponse(
transactions=[InventoryTransactionWithProduct(**tx) for tx in transactions],
total=total,
skip=skip,
limit=limit,
)
@vendor_router.get(
"/transactions/product/{product_id}",
response_model=ProductTransactionHistoryResponse,
)
def get_product_transaction_history(
product_id: int,
limit: int = Query(50, ge=1, le=200),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get transaction history for a specific product.
Returns recent stock movements with current inventory status.
"""
result = inventory_transaction_service.get_product_history(
db=db,
vendor_id=current_user.token_vendor_id,
product_id=product_id,
limit=limit,
)
return ProductTransactionHistoryResponse(**result)
@vendor_router.get(
"/transactions/order/{order_id}",
response_model=OrderTransactionHistoryResponse,
)
def get_order_transaction_history(
order_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get all inventory transactions for a specific order.
Shows all stock movements (reserve, fulfill, release) related to an order.
"""
result = inventory_transaction_service.get_order_history(
db=db,
vendor_id=current_user.token_vendor_id,
order_id=order_id,
)
return OrderTransactionHistoryResponse(
order_id=result["order_id"],
order_number=result["order_number"],
transactions=[
InventoryTransactionWithProduct(**tx) for tx in result["transactions"]
],
)

View File

@@ -1,25 +0,0 @@
# app/modules/inventory/routes/vendor.py
"""
Inventory module vendor routes.
This module wraps the existing vendor inventory routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
"""
from fastapi import APIRouter, Depends
from app.api.deps import require_module_access
# Import original router (direct import to avoid circular dependency)
from app.api.v1.vendor.inventory import router as original_router
# Create module-aware router
vendor_router = APIRouter(
prefix="/inventory",
dependencies=[Depends(require_module_access("inventory"))],
)
# Re-export all routes from the original module with module access control
for route in original_router.routes:
vendor_router.routes.append(route)

View File

@@ -2,36 +2,11 @@
"""
Marketplace module route registration.
This module provides marketplace routes with module-based access control.
Structure:
- routes/api/ - REST API endpoints
- routes/pages/ - HTML page rendering (templates)
NOTE: Routers are not eagerly imported here to avoid circular imports.
Import directly from routes/api/admin.py or routes/api/vendor.py instead.
Import routers directly from their respective files:
- from app.modules.marketplace.routes.api.admin import admin_router, admin_letzshop_router
- from app.modules.marketplace.routes.api.vendor import vendor_router, vendor_letzshop_router
"""
def __getattr__(name: str):
"""Lazy import of routers to avoid circular imports."""
if name == "admin_router":
from app.modules.marketplace.routes.api.admin import admin_router
return admin_router
elif name == "admin_letzshop_router":
from app.modules.marketplace.routes.api.admin import admin_letzshop_router
return admin_letzshop_router
elif name == "vendor_router":
from app.modules.marketplace.routes.api.vendor import vendor_router
return vendor_router
elif name == "vendor_letzshop_router":
from app.modules.marketplace.routes.api.vendor import vendor_letzshop_router
return vendor_letzshop_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
__all__ = ["admin_router", "admin_letzshop_router", "vendor_router", "vendor_letzshop_router"]

View File

@@ -2,34 +2,7 @@
"""
Marketplace module API routes.
Provides REST API endpoints for marketplace integration:
- Admin API: Import jobs, vendor directory, marketplace products
- Vendor API: Letzshop sync, product imports, exports
NOTE: Routers are not eagerly imported here to avoid circular imports.
Import directly from admin.py or vendor.py instead.
Import routers directly from their respective files:
- from app.modules.marketplace.routes.api.admin import admin_router, admin_letzshop_router
- from app.modules.marketplace.routes.api.vendor import vendor_router, vendor_letzshop_router
"""
def __getattr__(name: str):
"""Lazy import of routers to avoid circular imports."""
if name == "admin_router":
from app.modules.marketplace.routes.api.admin import admin_router
return admin_router
elif name == "admin_letzshop_router":
from app.modules.marketplace.routes.api.admin import admin_letzshop_router
return admin_letzshop_router
elif name == "vendor_router":
from app.modules.marketplace.routes.api.vendor import vendor_router
return vendor_router
elif name == "vendor_letzshop_router":
from app.modules.marketplace.routes.api.vendor import vendor_letzshop_router
return vendor_letzshop_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
__all__ = ["admin_router", "admin_letzshop_router", "vendor_router", "vendor_letzshop_router"]

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,177 @@
# app/modules/marketplace/routes/api/admin_marketplace.py
"""
Marketplace import job monitoring endpoints for admin.
All routes require module access control for the 'marketplace' module.
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api, require_module_access
from app.core.database import get_db
from app.modules.marketplace.services.marketplace_import_job_service import marketplace_import_job_service
from app.services.stats_service import stats_service
from app.services.vendor_service import vendor_service
from models.schema.auth import UserContext
from app.modules.marketplace.schemas import (
AdminMarketplaceImportJobListResponse,
AdminMarketplaceImportJobRequest,
AdminMarketplaceImportJobResponse,
MarketplaceImportErrorListResponse,
MarketplaceImportErrorResponse,
MarketplaceImportJobRequest,
MarketplaceImportJobResponse,
)
from app.modules.analytics.schemas import ImportStatsResponse
admin_marketplace_router = APIRouter(
prefix="/marketplace-import-jobs",
dependencies=[Depends(require_module_access("marketplace"))],
)
logger = logging.getLogger(__name__)
@admin_marketplace_router.get("", response_model=AdminMarketplaceImportJobListResponse)
def get_all_marketplace_import_jobs(
marketplace: str | None = Query(None),
status: str | None = Query(None),
page: int = Query(1, ge=1),
limit: int = Query(100, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get all marketplace import jobs with pagination (Admin only)."""
jobs, total = marketplace_import_job_service.get_all_import_jobs_paginated(
db=db,
marketplace=marketplace,
status=status,
page=page,
limit=limit,
)
return AdminMarketplaceImportJobListResponse(
items=[
marketplace_import_job_service.convert_to_admin_response_model(job)
for job in jobs
],
total=total,
page=page,
limit=limit,
)
@admin_marketplace_router.post("", response_model=MarketplaceImportJobResponse)
async def create_marketplace_import_job(
request: AdminMarketplaceImportJobRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Create a new marketplace import job (Admin only).
Admins can trigger imports for any vendor by specifying vendor_id.
The import is processed asynchronously in the background.
The `language` parameter specifies the language code for product
translations (e.g., 'en', 'fr', 'de'). Default is 'en'.
"""
vendor = vendor_service.get_vendor_by_id(db, request.vendor_id)
job_request = MarketplaceImportJobRequest(
source_url=request.source_url,
marketplace=request.marketplace,
batch_size=request.batch_size,
language=request.language,
)
job = marketplace_import_job_service.create_import_job(
db=db,
request=job_request,
vendor=vendor,
user=current_admin,
)
db.commit()
logger.info(
f"Admin {current_admin.username} created import job {job.id} "
f"for vendor {vendor.vendor_code} (language={request.language})"
)
# Dispatch via task dispatcher (supports Celery or BackgroundTasks)
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_marketplace_import(
background_tasks=background_tasks,
job_id=job.id,
url=request.source_url,
marketplace=request.marketplace,
vendor_id=vendor.id,
batch_size=request.batch_size or 1000,
language=request.language,
)
# Store Celery task ID if using Celery
if celery_task_id:
job.celery_task_id = celery_task_id
db.commit()
return marketplace_import_job_service.convert_to_response_model(job)
# NOTE: /stats must be defined BEFORE /{job_id} to avoid route conflicts
@admin_marketplace_router.get("/stats", response_model=ImportStatsResponse)
def get_import_statistics(
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get marketplace import statistics (Admin only)."""
stats = stats_service.get_import_statistics(db)
return ImportStatsResponse(**stats)
@admin_marketplace_router.get("/{job_id}", response_model=AdminMarketplaceImportJobResponse)
def get_marketplace_import_job(
job_id: int,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get a single marketplace import job by ID (Admin only)."""
job = marketplace_import_job_service.get_import_job_by_id_admin(db, job_id)
return marketplace_import_job_service.convert_to_admin_response_model(job)
@admin_marketplace_router.get("/{job_id}/errors", response_model=MarketplaceImportErrorListResponse)
def get_import_job_errors(
job_id: int,
page: int = Query(1, ge=1),
limit: int = Query(50, ge=1, le=100),
error_type: str | None = Query(None, description="Filter by error type"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""Get import errors for a specific job (Admin only).
Returns detailed error information including row number, identifier,
error type, error message, and raw row data for review.
"""
# Verify job exists
marketplace_import_job_service.get_import_job_by_id_admin(db, job_id)
# Get errors from service
errors, total = marketplace_import_job_service.get_import_job_errors(
db=db,
job_id=job_id,
error_type=error_type,
page=page,
limit=limit,
)
return MarketplaceImportErrorListResponse(
errors=[MarketplaceImportErrorResponse.model_validate(e) for e in errors],
total=total,
import_job_id=job_id,
)

View File

@@ -2,43 +2,32 @@
"""
Marketplace module vendor routes.
This module wraps the existing vendor marketplace routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
This module aggregates all marketplace vendor routers into a single router
for auto-discovery. Routes are defined in dedicated files with module-based
access control.
Includes:
- /marketplace/* - Marketplace settings
- /marketplace/* - Marketplace import management
- /letzshop/* - Letzshop integration
"""
import importlib
from fastapi import APIRouter
from fastapi import APIRouter, Depends
from .vendor_marketplace import vendor_marketplace_router
from .vendor_letzshop import vendor_letzshop_router
from .vendor_onboarding import vendor_onboarding_router
from app.api.deps import require_module_access
# Create aggregate router for auto-discovery
# The router is named 'vendor_router' for auto-discovery compatibility
vendor_router = APIRouter()
# Import original routers using importlib to avoid circular imports
# (direct import triggers app.api.v1.vendor.__init__.py which imports us)
_marketplace_module = importlib.import_module("app.api.v1.vendor.marketplace")
_letzshop_module = importlib.import_module("app.api.v1.vendor.letzshop")
marketplace_original_router = _marketplace_module.router
letzshop_original_router = _letzshop_module.router
# Include marketplace import routes
vendor_router.include_router(vendor_marketplace_router)
# Create module-aware router for marketplace
vendor_router = APIRouter(
prefix="/marketplace",
dependencies=[Depends(require_module_access("marketplace"))],
)
# Include letzshop routes
vendor_router.include_router(vendor_letzshop_router)
# Re-export all routes from the original marketplace module
for route in marketplace_original_router.routes:
vendor_router.routes.append(route)
# Include onboarding routes
vendor_router.include_router(vendor_onboarding_router)
# Create separate router for letzshop integration
vendor_letzshop_router = APIRouter(
prefix="/letzshop",
dependencies=[Depends(require_module_access("marketplace"))],
)
for route in letzshop_original_router.routes:
vendor_letzshop_router.routes.append(route)
__all__ = ["vendor_router"]

View File

@@ -0,0 +1,793 @@
# app/modules/marketplace/routes/api/vendor_letzshop.py
"""
Vendor API endpoints for Letzshop marketplace integration.
Provides vendor-level management of:
- Letzshop credentials
- Connection testing
- Order import and sync
- Fulfillment operations (confirm, reject, tracking)
Vendor Context: Uses token_vendor_id from JWT token.
All routes require module access control for the 'marketplace' module.
"""
import logging
from fastapi import APIRouter, Depends, Path, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.exceptions import (
OrderHasUnresolvedExceptionsException,
ResourceNotFoundException,
ValidationException,
)
from app.services.order_item_exception_service import order_item_exception_service
from app.modules.marketplace.services.letzshop import (
CredentialsNotFoundError,
LetzshopClientError,
LetzshopCredentialsService,
LetzshopOrderService,
OrderNotFoundError,
)
from models.schema.auth import UserContext
from app.modules.marketplace.schemas import (
FulfillmentConfirmRequest,
FulfillmentOperationResponse,
FulfillmentQueueItemResponse,
FulfillmentQueueListResponse,
FulfillmentRejectRequest,
FulfillmentTrackingRequest,
LetzshopConnectionTestRequest,
LetzshopConnectionTestResponse,
LetzshopCredentialsCreate,
LetzshopCredentialsResponse,
LetzshopCredentialsStatus,
LetzshopCredentialsUpdate,
LetzshopOrderDetailResponse,
LetzshopOrderListResponse,
LetzshopOrderResponse,
LetzshopSuccessResponse,
LetzshopSyncLogListResponse,
LetzshopSyncLogResponse,
LetzshopSyncTriggerRequest,
LetzshopSyncTriggerResponse,
)
vendor_letzshop_router = APIRouter(
prefix="/letzshop",
dependencies=[Depends(require_module_access("marketplace"))],
)
logger = logging.getLogger(__name__)
# ============================================================================
# Helper Functions
# ============================================================================
def get_order_service(db: Session) -> LetzshopOrderService:
"""Get order service instance."""
return LetzshopOrderService(db)
def get_credentials_service(db: Session) -> LetzshopCredentialsService:
"""Get credentials service instance."""
return LetzshopCredentialsService(db)
# ============================================================================
# Status & Configuration
# ============================================================================
@vendor_letzshop_router.get("/status", response_model=LetzshopCredentialsStatus)
def get_letzshop_status(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get Letzshop integration status for the current vendor."""
creds_service = get_credentials_service(db)
status = creds_service.get_status(current_user.token_vendor_id)
return LetzshopCredentialsStatus(**status)
@vendor_letzshop_router.get("/credentials", response_model=LetzshopCredentialsResponse)
def get_credentials(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get Letzshop credentials for the current vendor (API key is masked)."""
creds_service = get_credentials_service(db)
vendor_id = current_user.token_vendor_id
try:
credentials = creds_service.get_credentials_or_raise(vendor_id)
except CredentialsNotFoundError:
raise ResourceNotFoundException("LetzshopCredentials", str(vendor_id))
return LetzshopCredentialsResponse(
id=credentials.id,
vendor_id=credentials.vendor_id,
api_key_masked=creds_service.get_masked_api_key(vendor_id),
api_endpoint=credentials.api_endpoint,
auto_sync_enabled=credentials.auto_sync_enabled,
sync_interval_minutes=credentials.sync_interval_minutes,
last_sync_at=credentials.last_sync_at,
last_sync_status=credentials.last_sync_status,
last_sync_error=credentials.last_sync_error,
created_at=credentials.created_at,
updated_at=credentials.updated_at,
)
@vendor_letzshop_router.post("/credentials", response_model=LetzshopCredentialsResponse)
def save_credentials(
credentials_data: LetzshopCredentialsCreate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Create or update Letzshop credentials for the current vendor."""
creds_service = get_credentials_service(db)
vendor_id = current_user.token_vendor_id
credentials = creds_service.upsert_credentials(
vendor_id=vendor_id,
api_key=credentials_data.api_key,
api_endpoint=credentials_data.api_endpoint,
auto_sync_enabled=credentials_data.auto_sync_enabled,
sync_interval_minutes=credentials_data.sync_interval_minutes,
)
db.commit()
logger.info(f"Vendor user {current_user.email} updated Letzshop credentials")
return LetzshopCredentialsResponse(
id=credentials.id,
vendor_id=credentials.vendor_id,
api_key_masked=creds_service.get_masked_api_key(vendor_id),
api_endpoint=credentials.api_endpoint,
auto_sync_enabled=credentials.auto_sync_enabled,
sync_interval_minutes=credentials.sync_interval_minutes,
last_sync_at=credentials.last_sync_at,
last_sync_status=credentials.last_sync_status,
last_sync_error=credentials.last_sync_error,
created_at=credentials.created_at,
updated_at=credentials.updated_at,
)
@vendor_letzshop_router.patch("/credentials", response_model=LetzshopCredentialsResponse)
def update_credentials(
credentials_data: LetzshopCredentialsUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Partially update Letzshop credentials for the current vendor."""
creds_service = get_credentials_service(db)
vendor_id = current_user.token_vendor_id
try:
credentials = creds_service.update_credentials(
vendor_id=vendor_id,
api_key=credentials_data.api_key,
api_endpoint=credentials_data.api_endpoint,
auto_sync_enabled=credentials_data.auto_sync_enabled,
sync_interval_minutes=credentials_data.sync_interval_minutes,
)
db.commit()
except CredentialsNotFoundError:
raise ResourceNotFoundException("LetzshopCredentials", str(vendor_id))
return LetzshopCredentialsResponse(
id=credentials.id,
vendor_id=credentials.vendor_id,
api_key_masked=creds_service.get_masked_api_key(vendor_id),
api_endpoint=credentials.api_endpoint,
auto_sync_enabled=credentials.auto_sync_enabled,
sync_interval_minutes=credentials.sync_interval_minutes,
last_sync_at=credentials.last_sync_at,
last_sync_status=credentials.last_sync_status,
last_sync_error=credentials.last_sync_error,
created_at=credentials.created_at,
updated_at=credentials.updated_at,
)
@vendor_letzshop_router.delete("/credentials", response_model=LetzshopSuccessResponse)
def delete_credentials(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Delete Letzshop credentials for the current vendor."""
creds_service = get_credentials_service(db)
deleted = creds_service.delete_credentials(current_user.token_vendor_id)
if not deleted:
raise ResourceNotFoundException(
"LetzshopCredentials", str(current_user.token_vendor_id)
)
db.commit()
logger.info(f"Vendor user {current_user.email} deleted Letzshop credentials")
return LetzshopSuccessResponse(success=True, message="Letzshop credentials deleted")
# ============================================================================
# Connection Testing
# ============================================================================
@vendor_letzshop_router.post("/test", response_model=LetzshopConnectionTestResponse)
def test_connection(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Test the Letzshop connection using stored credentials."""
creds_service = get_credentials_service(db)
success, response_time_ms, error = creds_service.test_connection(
current_user.token_vendor_id
)
return LetzshopConnectionTestResponse(
success=success,
message="Connection successful" if success else "Connection failed",
response_time_ms=response_time_ms,
error_details=error,
)
@vendor_letzshop_router.post("/test-key", response_model=LetzshopConnectionTestResponse)
def test_api_key(
test_request: LetzshopConnectionTestRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Test a Letzshop API key without saving it."""
creds_service = get_credentials_service(db)
success, response_time_ms, error = creds_service.test_api_key(
api_key=test_request.api_key,
api_endpoint=test_request.api_endpoint,
)
return LetzshopConnectionTestResponse(
success=success,
message="Connection successful" if success else "Connection failed",
response_time_ms=response_time_ms,
error_details=error,
)
# ============================================================================
# Order Management
# ============================================================================
@vendor_letzshop_router.get("/orders", response_model=LetzshopOrderListResponse)
def list_orders(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
status: str | None = Query(None, description="Filter by order status"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""List Letzshop orders for the current vendor."""
order_service = get_order_service(db)
vendor_id = current_user.token_vendor_id
orders, total = order_service.list_orders(
vendor_id=vendor_id,
skip=skip,
limit=limit,
status=status,
)
return LetzshopOrderListResponse(
orders=[
LetzshopOrderResponse(
id=order.id,
vendor_id=order.vendor_id,
order_number=order.order_number,
external_order_id=order.external_order_id,
external_shipment_id=order.external_shipment_id,
external_order_number=order.external_order_number,
status=order.status,
customer_email=order.customer_email,
customer_name=f"{order.customer_first_name} {order.customer_last_name}",
customer_locale=order.customer_locale,
ship_country_iso=order.ship_country_iso,
bill_country_iso=order.bill_country_iso,
total_amount=order.total_amount,
currency=order.currency,
tracking_number=order.tracking_number,
tracking_provider=order.tracking_provider,
order_date=order.order_date,
confirmed_at=order.confirmed_at,
shipped_at=order.shipped_at,
cancelled_at=order.cancelled_at,
created_at=order.created_at,
updated_at=order.updated_at,
)
for order in orders
],
total=total,
skip=skip,
limit=limit,
)
@vendor_letzshop_router.get("/orders/{order_id}", response_model=LetzshopOrderDetailResponse)
def get_order(
order_id: int = Path(..., description="Order ID"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get a specific Letzshop order with full details."""
order_service = get_order_service(db)
try:
order = order_service.get_order_or_raise(current_user.token_vendor_id, order_id)
except OrderNotFoundError:
raise ResourceNotFoundException("LetzshopOrder", str(order_id))
return LetzshopOrderDetailResponse(
# Base fields from LetzshopOrderResponse
id=order.id,
vendor_id=order.vendor_id,
order_number=order.order_number,
external_order_id=order.external_order_id,
external_shipment_id=order.external_shipment_id,
external_order_number=order.external_order_number,
status=order.status,
customer_email=order.customer_email,
customer_name=f"{order.customer_first_name} {order.customer_last_name}",
customer_locale=order.customer_locale,
ship_country_iso=order.ship_country_iso,
bill_country_iso=order.bill_country_iso,
total_amount=order.total_amount,
currency=order.currency,
tracking_number=order.tracking_number,
tracking_provider=order.tracking_provider,
order_date=order.order_date,
confirmed_at=order.confirmed_at,
shipped_at=order.shipped_at,
cancelled_at=order.cancelled_at,
created_at=order.created_at,
updated_at=order.updated_at,
# Detail fields from LetzshopOrderDetailResponse
customer_first_name=order.customer_first_name,
customer_last_name=order.customer_last_name,
customer_phone=order.customer_phone,
ship_first_name=order.ship_first_name,
ship_last_name=order.ship_last_name,
ship_company=order.ship_company,
ship_address_line_1=order.ship_address_line_1,
ship_address_line_2=order.ship_address_line_2,
ship_city=order.ship_city,
ship_postal_code=order.ship_postal_code,
bill_first_name=order.bill_first_name,
bill_last_name=order.bill_last_name,
bill_company=order.bill_company,
bill_address_line_1=order.bill_address_line_1,
bill_address_line_2=order.bill_address_line_2,
bill_city=order.bill_city,
bill_postal_code=order.bill_postal_code,
external_data=order.external_data,
customer_notes=order.customer_notes,
internal_notes=order.internal_notes,
)
@vendor_letzshop_router.post("/orders/import", response_model=LetzshopSyncTriggerResponse)
def import_orders(
sync_request: LetzshopSyncTriggerRequest = LetzshopSyncTriggerRequest(),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Import new orders from Letzshop."""
vendor_id = current_user.token_vendor_id
order_service = get_order_service(db)
creds_service = get_credentials_service(db)
# Verify credentials exist
try:
creds_service.get_credentials_or_raise(vendor_id)
except CredentialsNotFoundError:
raise ValidationException("Letzshop credentials not configured")
# Import orders
try:
with creds_service.create_client(vendor_id) as client:
shipments = client.get_unconfirmed_shipments()
orders_imported = 0
orders_updated = 0
errors = []
for shipment in shipments:
try:
existing = order_service.get_order_by_shipment_id(
vendor_id, shipment["id"]
)
if existing:
order_service.update_order_from_shipment(existing, shipment)
orders_updated += 1
else:
order_service.create_order(vendor_id, shipment)
orders_imported += 1
except Exception as e:
errors.append(
f"Error processing shipment {shipment.get('id')}: {e}"
)
db.commit()
creds_service.update_sync_status(
vendor_id,
"success" if not errors else "partial",
"; ".join(errors) if errors else None,
)
return LetzshopSyncTriggerResponse(
success=True,
message=f"Import completed: {orders_imported} imported, {orders_updated} updated",
orders_imported=orders_imported,
orders_updated=orders_updated,
errors=errors,
)
except LetzshopClientError as e:
creds_service.update_sync_status(vendor_id, "failed", str(e))
return LetzshopSyncTriggerResponse(
success=False,
message=f"Import failed: {e}",
errors=[str(e)],
)
# ============================================================================
# Fulfillment Operations
# ============================================================================
@vendor_letzshop_router.post("/orders/{order_id}/confirm", response_model=FulfillmentOperationResponse)
def confirm_order(
order_id: int = Path(..., description="Order ID"),
confirm_request: FulfillmentConfirmRequest | None = None,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Confirm inventory units for a Letzshop order.
Raises:
OrderHasUnresolvedExceptionsException: If order has unresolved product exceptions
"""
vendor_id = current_user.token_vendor_id
order_service = get_order_service(db)
creds_service = get_credentials_service(db)
try:
order = order_service.get_order_or_raise(vendor_id, order_id)
except OrderNotFoundError:
raise ResourceNotFoundException("LetzshopOrder", str(order_id))
# Check for unresolved exceptions (blocks confirmation)
unresolved_count = order_item_exception_service.get_unresolved_exception_count(
db, order_id
)
if unresolved_count > 0:
raise OrderHasUnresolvedExceptionsException(order_id, unresolved_count)
# Get inventory unit IDs from request or order items
if confirm_request and confirm_request.inventory_unit_ids:
inventory_unit_ids = confirm_request.inventory_unit_ids
else:
# Get inventory unit IDs from order items' external_item_id
inventory_unit_ids = [
item.external_item_id for item in order.items if item.external_item_id
]
if not inventory_unit_ids:
raise ValidationException("No inventory units to confirm")
try:
with creds_service.create_client(vendor_id) as client:
result = client.confirm_inventory_units(inventory_unit_ids)
# Check for errors
if result.get("errors"):
error_messages = [
f"{e.get('id', 'unknown')}: {e.get('message', 'Unknown error')}"
for e in result["errors"]
]
return FulfillmentOperationResponse(
success=False,
message="Some inventory units could not be confirmed",
errors=error_messages,
)
# Update order status
order_service.mark_order_confirmed(order)
db.commit()
return FulfillmentOperationResponse(
success=True,
message=f"Confirmed {len(inventory_unit_ids)} inventory units",
confirmed_units=[u.get("id") for u in result.get("inventoryUnits", [])],
)
except LetzshopClientError as e:
return FulfillmentOperationResponse(success=False, message=str(e))
@vendor_letzshop_router.post("/orders/{order_id}/reject", response_model=FulfillmentOperationResponse)
def reject_order(
order_id: int = Path(..., description="Order ID"),
reject_request: FulfillmentRejectRequest | None = None,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Reject inventory units for a Letzshop order."""
vendor_id = current_user.token_vendor_id
order_service = get_order_service(db)
creds_service = get_credentials_service(db)
try:
order = order_service.get_order_or_raise(vendor_id, order_id)
except OrderNotFoundError:
raise ResourceNotFoundException("LetzshopOrder", str(order_id))
# Get inventory unit IDs from request or order items
if reject_request and reject_request.inventory_unit_ids:
inventory_unit_ids = reject_request.inventory_unit_ids
else:
# Get inventory unit IDs from order items' external_item_id
inventory_unit_ids = [
item.external_item_id for item in order.items if item.external_item_id
]
if not inventory_unit_ids:
raise ValidationException("No inventory units to reject")
try:
with creds_service.create_client(vendor_id) as client:
result = client.reject_inventory_units(inventory_unit_ids)
if result.get("errors"):
error_messages = [
f"{e.get('id', 'unknown')}: {e.get('message', 'Unknown error')}"
for e in result["errors"]
]
return FulfillmentOperationResponse(
success=False,
message="Some inventory units could not be rejected",
errors=error_messages,
)
order_service.mark_order_rejected(order)
db.commit()
return FulfillmentOperationResponse(
success=True,
message=f"Rejected {len(inventory_unit_ids)} inventory units",
)
except LetzshopClientError as e:
return FulfillmentOperationResponse(success=False, message=str(e))
@vendor_letzshop_router.post("/orders/{order_id}/tracking", response_model=FulfillmentOperationResponse)
def set_order_tracking(
order_id: int = Path(..., description="Order ID"),
tracking_request: FulfillmentTrackingRequest = ...,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Set tracking information for a Letzshop order."""
vendor_id = current_user.token_vendor_id
order_service = get_order_service(db)
creds_service = get_credentials_service(db)
try:
order = order_service.get_order_or_raise(vendor_id, order_id)
except OrderNotFoundError:
raise ResourceNotFoundException("LetzshopOrder", str(order_id))
if not order.external_shipment_id:
raise ValidationException("Order does not have a shipment ID")
try:
with creds_service.create_client(vendor_id) as client:
result = client.set_shipment_tracking(
shipment_id=order.external_shipment_id,
tracking_code=tracking_request.tracking_number,
tracking_provider=tracking_request.tracking_carrier,
)
if result.get("errors"):
error_messages = [
f"{e.get('code', 'unknown')}: {e.get('message', 'Unknown error')}"
for e in result["errors"]
]
return FulfillmentOperationResponse(
success=False,
message="Failed to set tracking",
errors=error_messages,
)
order_service.set_order_tracking(
order,
tracking_request.tracking_number,
tracking_request.tracking_carrier,
)
db.commit()
return FulfillmentOperationResponse(
success=True,
message="Tracking information set",
tracking_number=tracking_request.tracking_number,
tracking_carrier=tracking_request.tracking_carrier,
)
except LetzshopClientError as e:
return FulfillmentOperationResponse(success=False, message=str(e))
# ============================================================================
# Sync Logs
# ============================================================================
@vendor_letzshop_router.get("/logs", response_model=LetzshopSyncLogListResponse)
def list_sync_logs(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""List Letzshop sync logs for the current vendor."""
order_service = get_order_service(db)
vendor_id = current_user.token_vendor_id
logs, total = order_service.list_sync_logs(
vendor_id=vendor_id,
skip=skip,
limit=limit,
)
return LetzshopSyncLogListResponse(
logs=[
LetzshopSyncLogResponse(
id=log.id,
vendor_id=log.vendor_id,
operation_type=log.operation_type,
direction=log.direction,
status=log.status,
records_processed=log.records_processed,
records_succeeded=log.records_succeeded,
records_failed=log.records_failed,
error_details=log.error_details,
started_at=log.started_at,
completed_at=log.completed_at,
duration_seconds=log.duration_seconds,
triggered_by=log.triggered_by,
created_at=log.created_at,
)
for log in logs
],
total=total,
skip=skip,
limit=limit,
)
# ============================================================================
# Fulfillment Queue
# ============================================================================
@vendor_letzshop_router.get("/queue", response_model=FulfillmentQueueListResponse)
def list_fulfillment_queue(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=200),
status: str | None = Query(None, description="Filter by status"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""List fulfillment queue items for the current vendor."""
order_service = get_order_service(db)
vendor_id = current_user.token_vendor_id
items, total = order_service.list_fulfillment_queue(
vendor_id=vendor_id,
skip=skip,
limit=limit,
status=status,
)
return FulfillmentQueueListResponse(
items=[
FulfillmentQueueItemResponse(
id=item.id,
vendor_id=item.vendor_id,
letzshop_order_id=item.letzshop_order_id,
operation=item.operation,
payload=item.payload,
status=item.status,
attempts=item.attempts,
max_attempts=item.max_attempts,
last_attempt_at=item.last_attempt_at,
next_retry_at=item.next_retry_at,
error_message=item.error_message,
completed_at=item.completed_at,
response_data=item.response_data,
created_at=item.created_at,
updated_at=item.updated_at,
)
for item in items
],
total=total,
skip=skip,
limit=limit,
)
# ============================================================================
# Product Export
# ============================================================================
@vendor_letzshop_router.get("/export")
def export_products_letzshop(
language: str = Query(
"en", description="Language for title/description (en, fr, de)"
),
include_inactive: bool = Query(False, description="Include inactive products"),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Export vendor products in Letzshop CSV format.
Generates a Google Shopping compatible CSV file for Letzshop marketplace.
The file uses tab-separated values and includes all required Letzshop fields.
**Supported languages:** en, fr, de
**CSV Format:**
- Delimiter: Tab (\\t)
- Encoding: UTF-8
- Fields: id, title, description, price, availability, image_link, etc.
Returns:
CSV file as attachment (vendor_code_letzshop_export.csv)
"""
from fastapi.responses import Response
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
from app.services.vendor_service import vendor_service
vendor_id = current_user.token_vendor_id
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor_id,
language=language,
include_inactive=include_inactive,
)
filename = f"{vendor.vendor_code.lower()}_letzshop_export.csv"
return Response(
content=csv_content,
media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
},
)

View File

@@ -0,0 +1,137 @@
# app/modules/marketplace/routes/api/vendor_marketplace.py
"""
Marketplace import endpoints for vendors.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
All routes require module access control for the 'marketplace' module.
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.modules.marketplace.services.marketplace_import_job_service import marketplace_import_job_service
from app.services.vendor_service import vendor_service
from middleware.decorators import rate_limit
from models.schema.auth import UserContext
from app.modules.marketplace.schemas import (
MarketplaceImportJobRequest,
MarketplaceImportJobResponse,
)
vendor_marketplace_router = APIRouter(
prefix="/marketplace",
dependencies=[Depends(require_module_access("marketplace"))],
)
logger = logging.getLogger(__name__)
@vendor_marketplace_router.post("/import", response_model=MarketplaceImportJobResponse)
@rate_limit(max_requests=10, window_seconds=3600)
async def import_products_from_marketplace(
request: MarketplaceImportJobRequest,
background_tasks: BackgroundTasks,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Import products from marketplace CSV with background processing (Protected).
The `language` parameter specifies the language code for product
translations (e.g., 'en', 'fr', 'de'). Default is 'en'.
For multi-language imports, call this endpoint multiple times with
different language codes and CSV files containing translations.
"""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
logger.info(
f"Starting marketplace import: {request.marketplace} for vendor {vendor.vendor_code} "
f"by user {current_user.username} (language={request.language})"
)
# Create import job (vendor comes from token)
import_job = marketplace_import_job_service.create_import_job(
db, request, vendor, current_user
)
db.commit()
# Dispatch via task dispatcher (supports Celery or BackgroundTasks)
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_marketplace_import(
background_tasks=background_tasks,
job_id=import_job.id,
url=request.source_url,
marketplace=request.marketplace,
vendor_id=vendor.id,
batch_size=request.batch_size or 1000,
language=request.language,
)
# Store Celery task ID if using Celery
if celery_task_id:
import_job.celery_task_id = celery_task_id
db.commit()
return MarketplaceImportJobResponse(
job_id=import_job.id,
status="pending",
marketplace=request.marketplace,
vendor_id=import_job.vendor_id,
vendor_code=vendor.vendor_code,
vendor_name=vendor.name,
source_url=request.source_url,
language=request.language,
message=f"Marketplace import started from {request.marketplace}. "
f"Check status with /import-status/{import_job.id}",
imported=0,
updated=0,
total_processed=0,
error_count=0,
created_at=import_job.created_at,
)
@vendor_marketplace_router.get("/imports/{job_id}", response_model=MarketplaceImportJobResponse)
def get_marketplace_import_status(
job_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get status of marketplace import job (Protected)."""
# Service validates that job belongs to vendor and raises UnauthorizedVendorAccessException if not
job = marketplace_import_job_service.get_import_job_for_vendor(
db, job_id, current_user.token_vendor_id
)
return marketplace_import_job_service.convert_to_response_model(job)
@vendor_marketplace_router.get("/imports", response_model=list[MarketplaceImportJobResponse])
def get_marketplace_import_jobs(
marketplace: str | None = Query(None, description="Filter by marketplace"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get marketplace import jobs for current vendor (Protected)."""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
jobs = marketplace_import_job_service.get_import_jobs(
db=db,
vendor=vendor,
user=current_user,
marketplace=marketplace,
skip=skip,
limit=limit,
)
return [
marketplace_import_job_service.convert_to_response_model(job) for job in jobs
]

View File

@@ -0,0 +1,293 @@
# app/modules/marketplace/routes/api/vendor_onboarding.py
"""
Vendor onboarding API endpoints.
Provides endpoints for the 4-step mandatory onboarding wizard:
1. Company Profile Setup
2. Letzshop API Configuration
3. Product & Order Import Configuration
4. Order Sync (historical import)
Migrated from app/api/v1/vendor/onboarding.py to marketplace module.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
"""
import logging
from fastapi import APIRouter, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api, require_module_access
from app.core.database import get_db
from app.services.onboarding_service import OnboardingService
from models.schema.auth import UserContext
from app.modules.marketplace.schemas import (
CompanyProfileRequest,
CompanyProfileResponse,
LetzshopApiConfigRequest,
LetzshopApiConfigResponse,
LetzshopApiTestRequest,
LetzshopApiTestResponse,
OnboardingStatusResponse,
OrderSyncCompleteRequest,
OrderSyncCompleteResponse,
OrderSyncProgressResponse,
OrderSyncTriggerRequest,
OrderSyncTriggerResponse,
ProductImportConfigRequest,
ProductImportConfigResponse,
)
vendor_onboarding_router = APIRouter(
prefix="/onboarding",
dependencies=[Depends(require_module_access("marketplace"))],
)
logger = logging.getLogger(__name__)
# =============================================================================
# Status Endpoint
# =============================================================================
@vendor_onboarding_router.get("/status", response_model=OnboardingStatusResponse)
def get_onboarding_status(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get current onboarding status.
Returns full status including all step completion states and progress.
"""
service = OnboardingService(db)
status = service.get_status_response(current_user.token_vendor_id)
return status
# =============================================================================
# Step 1: Company Profile
# =============================================================================
@vendor_onboarding_router.get("/step/company-profile")
def get_company_profile(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get current company profile data for editing.
Returns pre-filled data from vendor and company records.
"""
service = OnboardingService(db)
return service.get_company_profile_data(current_user.token_vendor_id)
@vendor_onboarding_router.post("/step/company-profile", response_model=CompanyProfileResponse)
def save_company_profile(
request: CompanyProfileRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Save company profile and complete Step 1.
Updates vendor and company records with provided data.
"""
service = OnboardingService(db)
result = service.complete_company_profile(
vendor_id=current_user.token_vendor_id,
company_name=request.company_name,
brand_name=request.brand_name,
description=request.description,
contact_email=request.contact_email,
contact_phone=request.contact_phone,
website=request.website,
business_address=request.business_address,
tax_number=request.tax_number,
default_language=request.default_language,
dashboard_language=request.dashboard_language,
)
db.commit() # Commit at API level for transaction control
return result
# =============================================================================
# Step 2: Letzshop API Configuration
# =============================================================================
@vendor_onboarding_router.post("/step/letzshop-api/test", response_model=LetzshopApiTestResponse)
def test_letzshop_api(
request: LetzshopApiTestRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Test Letzshop API connection without saving.
Use this to validate API key before saving credentials.
"""
service = OnboardingService(db)
return service.test_letzshop_api(
api_key=request.api_key,
shop_slug=request.shop_slug,
)
@vendor_onboarding_router.post("/step/letzshop-api", response_model=LetzshopApiConfigResponse)
def save_letzshop_api(
request: LetzshopApiConfigRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Save Letzshop API credentials and complete Step 2.
Tests connection first, only saves if successful.
"""
service = OnboardingService(db)
result = service.complete_letzshop_api(
vendor_id=current_user.token_vendor_id,
api_key=request.api_key,
shop_slug=request.shop_slug,
letzshop_vendor_id=request.vendor_id,
)
db.commit() # Commit at API level for transaction control
return result
# =============================================================================
# Step 3: Product & Order Import Configuration
# =============================================================================
@vendor_onboarding_router.get("/step/product-import")
def get_product_import_config(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get current product import configuration.
Returns pre-filled CSV URLs and Letzshop feed settings.
"""
service = OnboardingService(db)
return service.get_product_import_config(current_user.token_vendor_id)
@vendor_onboarding_router.post("/step/product-import", response_model=ProductImportConfigResponse)
def save_product_import_config(
request: ProductImportConfigRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Save product import configuration and complete Step 3.
At least one CSV URL must be provided.
"""
service = OnboardingService(db)
result = service.complete_product_import(
vendor_id=current_user.token_vendor_id,
csv_url_fr=request.csv_url_fr,
csv_url_en=request.csv_url_en,
csv_url_de=request.csv_url_de,
default_tax_rate=request.default_tax_rate,
delivery_method=request.delivery_method,
preorder_days=request.preorder_days,
)
db.commit() # Commit at API level for transaction control
return result
# =============================================================================
# Step 4: Order Sync
# =============================================================================
@vendor_onboarding_router.post("/step/order-sync/trigger", response_model=OrderSyncTriggerResponse)
def trigger_order_sync(
request: OrderSyncTriggerRequest,
background_tasks: BackgroundTasks,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Trigger historical order import.
Creates a background job that imports orders from Letzshop.
"""
service = OnboardingService(db)
result = service.trigger_order_sync(
vendor_id=current_user.token_vendor_id,
user_id=current_user.id,
days_back=request.days_back,
include_products=request.include_products,
)
db.commit() # Commit at API level for transaction control
# Queue background task to process the import
if result.get("success") and result.get("job_id"):
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_historical_import(
background_tasks=background_tasks,
job_id=result["job_id"],
vendor_id=current_user.token_vendor_id,
)
# Store Celery task ID if using Celery
if celery_task_id:
from app.modules.marketplace.services.letzshop import LetzshopOrderService
order_service = LetzshopOrderService(db)
order_service.update_job_celery_task_id(result["job_id"], celery_task_id)
logger.info(f"Queued historical import task for job {result['job_id']}")
return result
@vendor_onboarding_router.get(
"/step/order-sync/progress/{job_id}",
response_model=OrderSyncProgressResponse,
)
def get_order_sync_progress(
job_id: int,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get order sync job progress.
Poll this endpoint to show progress bar during import.
"""
service = OnboardingService(db)
return service.get_order_sync_progress(
vendor_id=current_user.token_vendor_id,
job_id=job_id,
)
@vendor_onboarding_router.post("/step/order-sync/complete", response_model=OrderSyncCompleteResponse)
def complete_order_sync(
request: OrderSyncCompleteRequest,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Mark order sync step as complete.
Called after the import job finishes (success or failure).
This also marks the entire onboarding as complete.
"""
service = OnboardingService(db)
result = service.complete_order_sync(
vendor_id=current_user.token_vendor_id,
job_id=request.job_id,
)
db.commit() # Commit at API level for transaction control
return result

View File

@@ -14,7 +14,7 @@ from sqlalchemy import func
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.orm import Session
from app.services.letzshop.client_service import LetzshopClient
from .client_service import LetzshopClient
from app.modules.marketplace.models import LetzshopVendorCache
logger = logging.getLogger(__name__)

View File

@@ -40,7 +40,7 @@ def export_vendor_products_to_folder(
Returns:
dict: Export results per language with file paths
"""
from app.services.letzshop_export_service import letzshop_export_service
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
languages = ["en", "fr", "de"]
results = {}
@@ -149,7 +149,7 @@ def export_marketplace_products(
Returns:
dict: Export result with file path
"""
from app.services.letzshop_export_service import letzshop_export_service
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
with self.get_db() as db:
started_at = datetime.now(UTC)

View File

@@ -15,9 +15,11 @@ from typing import Callable
from app.core.celery_config import celery_app
from app.modules.marketplace.models import MarketplaceImportJob, LetzshopHistoricalImportJob
from app.services.admin_notification_service import admin_notification_service
from app.services.letzshop import LetzshopClientError
from app.services.letzshop.credentials_service import LetzshopCredentialsService
from app.services.letzshop.order_service import LetzshopOrderService
from app.modules.marketplace.services.letzshop import (
LetzshopClientError,
LetzshopCredentialsService,
LetzshopOrderService,
)
from app.modules.task_base import ModuleTask
from app.utils.csv_processor import CSVProcessor
from models.database.vendor import Vendor

View File

@@ -11,7 +11,7 @@ from typing import Any
from app.core.celery_config import celery_app
from app.modules.task_base import ModuleTask
from app.services.admin_notification_service import admin_notification_service
from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService
from app.modules.marketplace.services.letzshop import LetzshopVendorSyncService
logger = logging.getLogger(__name__)

View File

@@ -6,23 +6,31 @@ This module provides functions to register orders routes
with module-based access control.
NOTE: Routers are NOT auto-imported to avoid circular dependencies.
Import directly from admin.py or vendor.py as needed:
from app.modules.orders.routes.admin import admin_router
from app.modules.orders.routes.vendor import vendor_router
Import directly from api submodule as needed:
from app.modules.orders.routes.api import admin_router
from app.modules.orders.routes.api import vendor_router
"""
# Routers are imported on-demand to avoid circular dependencies
# Do NOT add auto-imports here
__all__ = ["admin_router", "vendor_router"]
__all__ = [
"admin_router",
"admin_exceptions_router",
"vendor_router",
"vendor_exceptions_router",
]
def __getattr__(name: str):
"""Lazy import routers to avoid circular dependencies."""
if name == "admin_router":
from app.modules.orders.routes.admin import admin_router
from app.modules.orders.routes.api import admin_router
return admin_router
elif name == "admin_exceptions_router":
from app.modules.orders.routes.api import admin_exceptions_router
return admin_exceptions_router
elif name == "vendor_router":
from app.modules.orders.routes.vendor import vendor_router
from app.modules.orders.routes.api import vendor_router
return vendor_router
elif name == "vendor_exceptions_router":
from app.modules.orders.routes.api import vendor_exceptions_router
return vendor_exceptions_router
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")

View File

@@ -1,40 +0,0 @@
# app/modules/orders/routes/admin.py
"""
Orders module admin routes.
This module wraps the existing admin orders routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
Includes:
- /orders/* - Order management
- /order-item-exceptions/* - Exception handling
"""
from fastapi import APIRouter, Depends
from app.api.deps import require_module_access
# Import original routers (direct import to avoid circular dependency)
from app.api.v1.admin.orders import router as orders_original_router
from app.api.v1.admin.order_item_exceptions import router as exceptions_original_router
# Create module-aware router for orders
admin_router = APIRouter(
prefix="/orders",
dependencies=[Depends(require_module_access("orders"))],
)
# Re-export all routes from the original orders module
for route in orders_original_router.routes:
admin_router.routes.append(route)
# Create separate router for order item exceptions
# This is included separately in the admin __init__.py
admin_exceptions_router = APIRouter(
prefix="/order-item-exceptions",
dependencies=[Depends(require_module_access("orders"))],
)
for route in exceptions_original_router.routes:
admin_exceptions_router.routes.append(route)

View File

@@ -1,39 +0,0 @@
# app/modules/orders/routes/vendor.py
"""
Orders module vendor routes.
This module wraps the existing vendor orders routes and adds
module-based access control. Routes are re-exported from the
original location with the module access dependency.
Includes:
- /orders/* - Order management
- /order-item-exceptions/* - Exception handling
"""
from fastapi import APIRouter, Depends
from app.api.deps import require_module_access
# Import original routers (direct import to avoid circular dependency)
from app.api.v1.vendor.orders import router as orders_original_router
from app.api.v1.vendor.order_item_exceptions import router as exceptions_original_router
# Create module-aware router for orders
vendor_router = APIRouter(
prefix="/orders",
dependencies=[Depends(require_module_access("orders"))],
)
# Re-export all routes from the original orders module
for route in orders_original_router.routes:
vendor_router.routes.append(route)
# Create separate router for order item exceptions
vendor_exceptions_router = APIRouter(
prefix="/order-item-exceptions",
dependencies=[Depends(require_module_access("orders"))],
)
for route in exceptions_original_router.routes:
vendor_exceptions_router.routes.append(route)