refactor: migrate vendor auth, profile, team, dashboard, settings to modules

Tenancy module (identity & organizational hierarchy):
- vendor_auth.py: login, logout, /me endpoints
- vendor_profile.py: vendor profile get/update
- vendor_team.py: team management, roles, permissions, invitations

Core module (foundational non-domain features):
- vendor_dashboard.py: dashboard statistics
- vendor_settings.py: localization, business info, letzshop settings

All routes auto-discovered via is_self_contained=True.
Deleted 5 legacy files from app/api/v1/vendor/.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-31 15:09:41 +01:00
parent d747f9ebaa
commit da3f28849e
12 changed files with 105 additions and 52 deletions

View File

@@ -24,23 +24,18 @@ Self-contained modules (auto-discovered from app/modules/{module}/routes/api/ven
- cms: Content pages management
- customers: Customer management
- payments: Payment configuration, Stripe connect, transactions
- tenancy: Public vendor info lookup
- tenancy: Vendor info, auth, profile, team management
"""
from fastapi import APIRouter
# Import all sub-routers (legacy routes that haven't been migrated to modules)
from . import (
auth,
dashboard,
email_settings,
email_templates,
media,
messages,
notifications,
profile,
settings,
team,
)
# Create vendor router
@@ -51,18 +46,14 @@ router = APIRouter()
# ============================================================================
# These routes return JSON and are mounted at /api/v1/vendor/*
# Authentication (no prefix, specific routes like /auth/login)
router.include_router(auth.router, tags=["vendor-auth"])
# Vendor management (with prefixes: /dashboard/*, /profile/*, /settings/*)
router.include_router(dashboard.router, tags=["vendor-dashboard"])
router.include_router(profile.router, tags=["vendor-profile"])
router.include_router(settings.router, tags=["vendor-settings"])
# Email configuration
router.include_router(email_templates.router, tags=["vendor-email-templates"])
router.include_router(email_settings.router, tags=["vendor-email-settings"])
# Business operations (with prefixes: /team/*)
router.include_router(team.router, tags=["vendor-team"])
# Services (with prefixes: /media/*, etc.)
router.include_router(media.router, tags=["vendor-media"])
router.include_router(notifications.router, tags=["vendor-notifications"])
router.include_router(messages.router, tags=["vendor-messages"])
# Services (with prefixes: /media/*, etc.)
router.include_router(media.router, tags=["vendor-media"])

View File

@@ -1,196 +0,0 @@
# app/api/v1/vendor/auth.py
"""
Vendor team authentication endpoints.
Implements dual token storage with path restriction:
- Sets HTTP-only cookie with path=/vendor (restricted to vendor routes only)
- Returns token in response for localStorage (API calls)
This prevents:
- Vendor cookies from being sent to admin routes
- Admin cookies from being sent to vendor routes
- Cross-context authentication confusion
"""
import logging
from fastapi import APIRouter, Depends, Request, Response
from pydantic import BaseModel
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api
from app.core.database import get_db
from app.core.environment import should_use_secure_cookies
from app.exceptions import InvalidCredentialsException
from app.services.auth_service import auth_service
from middleware.vendor_context import get_current_vendor
from models.schema.auth import UserContext
from models.schema.auth import LogoutResponse, UserLogin, VendorUserResponse
router = APIRouter(prefix="/auth")
logger = logging.getLogger(__name__)
# Response model for vendor login
class VendorLoginResponse(BaseModel):
access_token: str
token_type: str
expires_in: int
user: dict
vendor: dict
vendor_role: str
@router.post("/login", response_model=VendorLoginResponse)
def vendor_login(
user_credentials: UserLogin,
request: Request,
response: Response,
db: Session = Depends(get_db),
):
"""
Vendor team member login.
Authenticates users who are part of a vendor team.
Validates against vendor context if available.
Sets token in two places:
1. HTTP-only cookie with path=/vendor (for browser page navigation)
2. Response body (for localStorage and API calls)
Prevents admin users from logging into vendor portal.
"""
# Try to get vendor from middleware first
vendor = get_current_vendor(request)
# If no vendor from middleware, try to get from request body
if not vendor and hasattr(user_credentials, "vendor_code"):
vendor_code = getattr(user_credentials, "vendor_code", None)
if vendor_code:
vendor = auth_service.get_vendor_by_code(db, vendor_code)
# Authenticate user
login_result = auth_service.login_user(db=db, user_credentials=user_credentials)
user = login_result["user"]
# CRITICAL: Prevent admin users from using vendor login
if user.role == "admin":
logger.warning(f"Admin user attempted vendor login: {user.username}")
raise InvalidCredentialsException(
"Admins cannot access vendor portal. Please use admin portal."
)
# Determine vendor and role
vendor_role = "Member"
if vendor:
# Check if user has access to this vendor
has_access, role = auth_service.get_user_vendor_role(db, user, vendor)
if has_access:
vendor_role = role
else:
logger.warning(
f"User {user.username} attempted login to vendor {vendor.vendor_code} "
f"but is not authorized"
)
raise InvalidCredentialsException("You do not have access to this vendor")
else:
# No vendor context - find which vendor this user belongs to
vendor, vendor_role = auth_service.find_user_vendor(user)
if not vendor:
raise InvalidCredentialsException("User is not associated with any vendor")
logger.info(
f"Vendor team login successful: {user.username} "
f"for vendor {vendor.vendor_code} as {vendor_role}"
)
# Create vendor-scoped access token with vendor information
token_data = auth_service.auth_manager.create_access_token(
user=user,
vendor_id=vendor.id,
vendor_code=vendor.vendor_code,
vendor_role=vendor_role,
)
# Set HTTP-only cookie for browser navigation
# CRITICAL: path=/vendor restricts cookie to vendor routes only
response.set_cookie(
key="vendor_token",
value=token_data["access_token"],
httponly=True, # JavaScript cannot access (XSS protection)
secure=should_use_secure_cookies(), # HTTPS only in production/staging
samesite="lax", # CSRF protection
max_age=token_data["expires_in"], # Match JWT expiry
path="/vendor", # RESTRICTED TO VENDOR ROUTES ONLY
)
logger.debug(
f"Set vendor_token cookie with {token_data['expires_in']}s expiry "
f"(path=/vendor, httponly=True, secure={should_use_secure_cookies()})"
)
# Return full login response with vendor-scoped token
return VendorLoginResponse(
access_token=token_data["access_token"],
token_type=token_data["token_type"],
expires_in=token_data["expires_in"],
user={
"id": user.id,
"username": user.username,
"email": user.email,
"role": user.role,
"is_active": user.is_active,
},
vendor={
"id": vendor.id,
"vendor_code": vendor.vendor_code,
"subdomain": vendor.subdomain,
"name": vendor.name,
"is_active": vendor.is_active,
"is_verified": vendor.is_verified,
},
vendor_role=vendor_role,
)
@router.post("/logout", response_model=LogoutResponse)
def vendor_logout(response: Response):
"""
Vendor team member logout.
Clears the vendor_token cookie.
Client should also remove token from localStorage.
"""
logger.info("Vendor logout")
# Clear the cookie (must match path used when setting)
response.delete_cookie(
key="vendor_token",
path="/vendor",
)
logger.debug("Deleted vendor_token cookie")
return LogoutResponse(message="Logged out successfully")
@router.get("/me", response_model=VendorUserResponse)
def get_current_vendor_user(
user: UserContext = Depends(get_current_vendor_api), db: Session = Depends(get_db)
):
"""
Get current authenticated vendor user.
This endpoint can be called to verify authentication and get user info.
Requires Authorization header (header-only authentication for API endpoints).
"""
return VendorUserResponse(
id=user.id,
username=user.username,
email=user.email,
role=user.role,
is_active=user.is_active,
)

View File

@@ -1,85 +0,0 @@
# app/api/v1/vendor/dashboard.py
"""
Vendor dashboard and statistics endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
import logging
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api
from app.core.database import get_db
from app.exceptions import VendorNotActiveException
from app.services.stats_service import stats_service
from app.services.vendor_service import vendor_service
from models.schema.auth import UserContext
from app.modules.analytics.schemas import (
VendorCustomerStats,
VendorDashboardStatsResponse,
VendorInfo,
VendorOrderStats,
VendorProductStats,
VendorRevenueStats,
)
router = APIRouter(prefix="/dashboard")
logger = logging.getLogger(__name__)
@router.get("/stats", response_model=VendorDashboardStatsResponse)
def get_vendor_dashboard_stats(
request: Request,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Get vendor-specific dashboard statistics.
Returns statistics for the current vendor only:
- Total products in catalog
- Total orders
- Total customers
- Revenue metrics
Vendor is determined from the JWT token (vendor_id claim).
Requires Authorization header (API endpoint).
"""
vendor_id = current_user.token_vendor_id
# Get vendor object (raises VendorNotFoundException if not found)
vendor = vendor_service.get_vendor_by_id(db, vendor_id)
if not vendor.is_active:
raise VendorNotActiveException(vendor.vendor_code)
# Get vendor-scoped statistics
stats_data = stats_service.get_vendor_stats(db=db, vendor_id=vendor_id)
return VendorDashboardStatsResponse(
vendor=VendorInfo(
id=vendor.id,
name=vendor.name,
vendor_code=vendor.vendor_code,
),
products=VendorProductStats(
total=stats_data.get("total_products", 0),
active=stats_data.get("active_products", 0),
),
orders=VendorOrderStats(
total=stats_data.get("total_orders", 0),
pending=stats_data.get("pending_orders", 0),
completed=stats_data.get("completed_orders", 0),
),
customers=VendorCustomerStats(
total=stats_data.get("total_customers", 0),
active=stats_data.get("active_customers", 0),
),
revenue=VendorRevenueStats(
total=stats_data.get("total_revenue", 0),
this_month=stats_data.get("revenue_this_month", 0),
),
)

View File

@@ -1,44 +0,0 @@
# app/api/v1/vendor/profile.py
"""
Vendor profile management endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
import logging
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api
from app.core.database import get_db
from app.services.vendor_service import vendor_service
from models.schema.auth import UserContext
from models.schema.vendor import VendorResponse, VendorUpdate
router = APIRouter(prefix="/profile")
logger = logging.getLogger(__name__)
@router.get("", response_model=VendorResponse)
def get_vendor_profile(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get current vendor profile information."""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
return vendor
@router.put("", response_model=VendorResponse)
def update_vendor_profile(
vendor_update: VendorUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Update vendor profile information."""
# Service handles permission checking and raises InsufficientPermissionsException if needed
return vendor_service.update_vendor(
db, current_user.token_vendor_id, vendor_update, current_user
)

View File

@@ -1,439 +0,0 @@
# app/api/v1/vendor/settings.py
"""
Vendor settings and configuration endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
import logging
from fastapi import APIRouter, Depends
from pydantic import BaseModel, Field, field_validator
from sqlalchemy.orm import Session
from app.api.deps import get_current_vendor_api
from app.core.database import get_db
from app.services.platform_settings_service import platform_settings_service
from app.services.vendor_service import vendor_service
from models.schema.auth import UserContext
router = APIRouter(prefix="/settings")
logger = logging.getLogger(__name__)
# Supported languages for dropdown
SUPPORTED_LANGUAGES = [
{"code": "en", "name": "English"},
{"code": "fr", "name": "Français"},
{"code": "de", "name": "Deutsch"},
{"code": "lb", "name": "Lëtzebuergesch"},
]
# Supported locales for currency/number formatting
SUPPORTED_LOCALES = [
{"code": "fr-LU", "name": "Luxembourg (French)", "example": "29,99 €"},
{"code": "de-LU", "name": "Luxembourg (German)", "example": "29,99 €"},
{"code": "de-DE", "name": "Germany", "example": "29,99 €"},
{"code": "fr-FR", "name": "France", "example": "29,99 €"},
{"code": "en-GB", "name": "United Kingdom", "example": "€29.99"},
]
# Valid language codes for validation
VALID_LANGUAGE_CODES = {"en", "fr", "de", "lb"}
# Valid locale codes for validation
VALID_LOCALE_CODES = {"fr-LU", "de-LU", "de-DE", "fr-FR", "en-GB"}
class LocalizationSettingsUpdate(BaseModel):
"""Schema for updating localization settings."""
default_language: str | None = Field(
None, description="Default language for vendor content"
)
dashboard_language: str | None = Field(
None, description="Language for vendor dashboard UI"
)
storefront_language: str | None = Field(
None, description="Default language for customer storefront"
)
storefront_languages: list[str] | None = Field(
None, description="Enabled languages for storefront selector"
)
storefront_locale: str | None = Field(
None, description="Locale for currency/number formatting"
)
@field_validator("default_language", "dashboard_language", "storefront_language")
@classmethod
def validate_language(cls, v: str | None) -> str | None:
if v is not None and v not in VALID_LANGUAGE_CODES:
raise ValueError(f"Invalid language: {v}. Must be one of: {sorted(VALID_LANGUAGE_CODES)}")
return v
@field_validator("storefront_languages")
@classmethod
def validate_storefront_languages(cls, v: list[str] | None) -> list[str] | None:
if v is not None:
for lang in v:
if lang not in VALID_LANGUAGE_CODES:
raise ValueError(f"Invalid language: {lang}. Must be one of: {sorted(VALID_LANGUAGE_CODES)}")
return v
@field_validator("storefront_locale")
@classmethod
def validate_locale(cls, v: str | None) -> str | None:
if v is not None and v not in VALID_LOCALE_CODES:
raise ValueError(f"Invalid locale: {v}. Must be one of: {sorted(VALID_LOCALE_CODES)}")
return v
class BusinessInfoUpdate(BaseModel):
"""Schema for updating business info (can override company values)."""
name: str | None = Field(None, description="Store/brand name")
description: str | None = Field(None, description="Store description")
contact_email: str | None = Field(None, description="Contact email (null = inherit from company)")
contact_phone: str | None = Field(None, description="Contact phone (null = inherit from company)")
website: str | None = Field(None, description="Website URL (null = inherit from company)")
business_address: str | None = Field(None, description="Business address (null = inherit from company)")
tax_number: str | None = Field(None, description="Tax/VAT number (null = inherit from company)")
reset_to_company: list[str] | None = Field(
None, description="List of fields to reset to company values (e.g., ['contact_email', 'website'])"
)
# Valid Letzshop tax rates
VALID_TAX_RATES = [0, 3, 8, 14, 17]
# Valid delivery methods
VALID_DELIVERY_METHODS = ["nationwide", "package_delivery", "self_collect"]
class LetzshopFeedSettingsUpdate(BaseModel):
"""Schema for updating Letzshop feed settings."""
letzshop_csv_url_fr: str | None = Field(None, description="French CSV feed URL")
letzshop_csv_url_en: str | None = Field(None, description="English CSV feed URL")
letzshop_csv_url_de: str | None = Field(None, description="German CSV feed URL")
letzshop_default_tax_rate: int | None = Field(None, description="Default VAT rate (0, 3, 8, 14, 17)")
letzshop_boost_sort: str | None = Field(None, description="Sort priority (0.0-10.0)")
letzshop_delivery_method: str | None = Field(None, description="Delivery method")
letzshop_preorder_days: int | None = Field(None, ge=0, description="Pre-order lead time in days")
@field_validator("letzshop_default_tax_rate")
@classmethod
def validate_tax_rate(cls, v: int | None) -> int | None:
if v is not None and v not in VALID_TAX_RATES:
raise ValueError(f"Invalid tax rate. Must be one of: {VALID_TAX_RATES}")
return v
@field_validator("letzshop_delivery_method")
@classmethod
def validate_delivery_method(cls, v: str | None) -> str | None:
if v is not None:
methods = v.split(",")
for method in methods:
if method.strip() not in VALID_DELIVERY_METHODS:
raise ValueError(f"Invalid delivery method. Must be one of: {VALID_DELIVERY_METHODS}")
return v
@field_validator("letzshop_boost_sort")
@classmethod
def validate_boost_sort(cls, v: str | None) -> str | None:
if v is not None:
try:
boost = float(v)
if boost < 0.0 or boost > 10.0:
raise ValueError("Boost sort must be between 0.0 and 10.0")
except ValueError as e:
if "could not convert" in str(e).lower():
raise ValueError("Boost sort must be a valid number")
raise
return v
@router.get("")
def get_vendor_settings(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get comprehensive vendor settings and configuration."""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
# Get platform defaults for display
platform_config = platform_settings_service.get_storefront_config(db)
# Get business info with inheritance flags
contact_info = vendor.get_contact_info_with_inheritance()
# Get invoice settings if exists
invoice_settings = None
if vendor.invoice_settings:
inv = vendor.invoice_settings
invoice_settings = {
"company_name": inv.company_name,
"company_address": inv.company_address,
"company_city": inv.company_city,
"company_postal_code": inv.company_postal_code,
"company_country": inv.company_country,
"vat_number": inv.vat_number,
"is_vat_registered": inv.is_vat_registered,
"invoice_prefix": inv.invoice_prefix,
"invoice_next_number": inv.invoice_next_number,
"payment_terms": inv.payment_terms,
"bank_name": inv.bank_name,
"bank_iban": inv.bank_iban,
"bank_bic": inv.bank_bic,
"footer_text": inv.footer_text,
"default_vat_rate": inv.default_vat_rate,
}
# Get theme settings if exists
theme_settings = None
if vendor.vendor_theme:
theme = vendor.vendor_theme
theme_settings = {
"theme_name": theme.theme_name,
"colors": theme.colors,
"font_family_heading": theme.font_family_heading,
"font_family_body": theme.font_family_body,
"logo_url": theme.logo_url,
"logo_dark_url": theme.logo_dark_url,
"favicon_url": theme.favicon_url,
"banner_url": theme.banner_url,
"layout_style": theme.layout_style,
"header_style": theme.header_style,
"product_card_style": theme.product_card_style,
"social_links": theme.social_links,
"custom_css": theme.custom_css,
}
# Get domains (read-only)
domains = []
for domain in vendor.domains:
domains.append({
"id": domain.id,
"domain": domain.domain,
"is_primary": domain.is_primary,
"is_active": domain.is_active,
"ssl_status": domain.ssl_status,
"is_verified": domain.is_verified,
})
# Get Stripe info from subscription (read-only, masked)
stripe_info = None
if vendor.subscription and vendor.subscription.stripe_customer_id:
stripe_info = {
"has_stripe_customer": True,
"customer_id_masked": f"cus_***{vendor.subscription.stripe_customer_id[-4:]}",
}
return {
# General info
"vendor_code": vendor.vendor_code,
"subdomain": vendor.subdomain,
"name": vendor.name,
"description": vendor.description,
"is_active": vendor.is_active,
"is_verified": vendor.is_verified,
# Business info with inheritance (values + flags)
"business_info": {
"contact_email": contact_info["contact_email"],
"contact_email_inherited": contact_info["contact_email_inherited"],
"contact_email_override": vendor.contact_email, # Raw override value
"contact_phone": contact_info["contact_phone"],
"contact_phone_inherited": contact_info["contact_phone_inherited"],
"contact_phone_override": vendor.contact_phone,
"website": contact_info["website"],
"website_inherited": contact_info["website_inherited"],
"website_override": vendor.website,
"business_address": contact_info["business_address"],
"business_address_inherited": contact_info["business_address_inherited"],
"business_address_override": vendor.business_address,
"tax_number": contact_info["tax_number"],
"tax_number_inherited": contact_info["tax_number_inherited"],
"tax_number_override": vendor.tax_number,
"company_name": vendor.company.name if vendor.company else None,
},
# Localization settings
"localization": {
"default_language": vendor.default_language,
"dashboard_language": vendor.dashboard_language,
"storefront_language": vendor.storefront_language,
"storefront_languages": vendor.storefront_languages or ["fr", "de", "en"],
"storefront_locale": vendor.storefront_locale,
"platform_default_locale": platform_config["locale"],
"platform_currency": platform_config["currency"],
},
# Letzshop marketplace settings
"letzshop": {
"csv_url_fr": vendor.letzshop_csv_url_fr,
"csv_url_en": vendor.letzshop_csv_url_en,
"csv_url_de": vendor.letzshop_csv_url_de,
"default_tax_rate": vendor.letzshop_default_tax_rate,
"boost_sort": vendor.letzshop_boost_sort,
"delivery_method": vendor.letzshop_delivery_method,
"preorder_days": vendor.letzshop_preorder_days,
"vendor_id": vendor.letzshop_vendor_id,
"vendor_slug": vendor.letzshop_vendor_slug,
"has_credentials": vendor.letzshop_credentials is not None,
"auto_sync_enabled": vendor.letzshop_credentials.auto_sync_enabled if vendor.letzshop_credentials else False,
},
# Invoice settings
"invoice_settings": invoice_settings,
# Theme/branding settings
"theme_settings": theme_settings,
# Domains (read-only)
"domains": domains,
"default_subdomain": f"{vendor.subdomain}.letzshop.lu",
# Stripe info (read-only)
"stripe_info": stripe_info,
# Options for dropdowns
"options": {
"supported_languages": SUPPORTED_LANGUAGES,
"supported_locales": SUPPORTED_LOCALES,
"tax_rates": [
{"value": 0, "label": "0% (Exempt)"},
{"value": 3, "label": "3% (Super-reduced)"},
{"value": 8, "label": "8% (Reduced)"},
{"value": 14, "label": "14% (Intermediate)"},
{"value": 17, "label": "17% (Standard)"},
],
"delivery_methods": [
{"value": "nationwide", "label": "Nationwide (all methods)"},
{"value": "package_delivery", "label": "Package Delivery"},
{"value": "self_collect", "label": "Self Collect"},
],
},
}
@router.put("/business-info")
def update_business_info(
business_info: BusinessInfoUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Update vendor business info.
Fields can be set to override company values, or reset to inherit from company.
Use reset_to_company list to reset specific fields to inherit from company.
"""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
update_data = business_info.model_dump(exclude_unset=True)
# Handle reset_to_company - set those fields to None
reset_fields = update_data.pop("reset_to_company", None) or []
for field in reset_fields:
if field in ["contact_email", "contact_phone", "website", "business_address", "tax_number"]:
setattr(vendor, field, None)
logger.info(f"Reset {field} to inherit from company for vendor {vendor.id}")
# Update other fields
for key, value in update_data.items():
if key in ["name", "description", "contact_email", "contact_phone", "website", "business_address", "tax_number"]:
setattr(vendor, key, value)
db.commit()
db.refresh(vendor)
logger.info(f"Business info updated for vendor {vendor.id}")
# Return updated info with inheritance
contact_info = vendor.get_contact_info_with_inheritance()
return {
"message": "Business info updated",
"name": vendor.name,
"description": vendor.description,
"business_info": contact_info,
}
@router.put("/letzshop")
def update_letzshop_settings(
letzshop_config: LetzshopFeedSettingsUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Update Letzshop marketplace feed settings.
Validation is handled by Pydantic model validators.
"""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
update_data = letzshop_config.model_dump(exclude_unset=True)
# Apply updates (validation already done by Pydantic)
for key, value in update_data.items():
setattr(vendor, key, value)
db.commit()
db.refresh(vendor)
logger.info(f"Letzshop settings updated for vendor {vendor.id}")
return {
"message": "Letzshop settings updated",
"csv_url_fr": vendor.letzshop_csv_url_fr,
"csv_url_en": vendor.letzshop_csv_url_en,
"csv_url_de": vendor.letzshop_csv_url_de,
"default_tax_rate": vendor.letzshop_default_tax_rate,
"boost_sort": vendor.letzshop_boost_sort,
"delivery_method": vendor.letzshop_delivery_method,
"preorder_days": vendor.letzshop_preorder_days,
}
@router.put("/localization")
def update_localization_settings(
localization_config: LocalizationSettingsUpdate,
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""
Update vendor localization settings.
Allows vendors to configure:
- default_language: Default language for vendor content
- dashboard_language: UI language for vendor dashboard
- storefront_language: Default language for customer storefront
- storefront_languages: Enabled languages for storefront selector
- storefront_locale: Locale for currency/number formatting (or null for platform default)
Validation is handled by Pydantic model validators.
"""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id)
# Update only provided fields (validation already done by Pydantic)
update_data = localization_config.model_dump(exclude_unset=True)
# Apply updates
for key, value in update_data.items():
setattr(vendor, key, value)
db.commit()
db.refresh(vendor)
logger.info(
f"Localization settings updated for vendor {vendor.id}",
extra={"vendor_id": vendor.id, "updated_fields": list(update_data.keys())},
)
return {
"message": "Localization settings updated",
"default_language": vendor.default_language,
"dashboard_language": vendor.dashboard_language,
"storefront_language": vendor.storefront_language,
"storefront_languages": vendor.storefront_languages,
"storefront_locale": vendor.storefront_locale,
}

View File

@@ -1,485 +0,0 @@
# app/api/v1/vendor/teams.py
"""
Vendor team member management endpoints.
Implements complete team management with:
- Team member listing
- Invitation system
- Role management
- Permission checking
- RBAC integration
"""
import logging
from fastapi import APIRouter, Depends, Request
from sqlalchemy.orm import Session
from app.api.deps import (
get_current_vendor_api,
get_user_permissions,
require_vendor_owner,
require_vendor_permission,
)
from app.core.database import get_db
from app.core.permissions import VendorPermissions
from app.services.vendor_team_service import vendor_team_service
from models.schema.auth import UserContext
from models.schema.team import (
BulkRemoveRequest,
BulkRemoveResponse,
InvitationAccept,
InvitationAcceptResponse,
InvitationResponse,
RoleListResponse,
TeamMemberInvite,
TeamMemberListResponse,
TeamMemberResponse,
TeamMemberUpdate,
TeamStatistics,
UserPermissionsResponse,
)
router = APIRouter(prefix="/team")
logger = logging.getLogger(__name__)
# ============================================================================
# Team Member Routes
# ============================================================================
@router.get("/members", response_model=TeamMemberListResponse)
def list_team_members(
request: Request,
include_inactive: bool = False,
db: Session = Depends(get_db),
current_user: UserContext = Depends(
require_vendor_permission(VendorPermissions.TEAM_VIEW.value)
),
):
"""
Get all team members for current vendor.
**Required Permission:** `team.view`
**Query Parameters:**
- `include_inactive`: Include inactive team members (default: False)
**Returns:**
- List of team members with their roles and permissions
- Statistics (total, active, pending)
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db, vendor=vendor, include_inactive=include_inactive
)
# Calculate statistics
total = len(members)
active = sum(1 for m in members if m["is_active"])
pending = sum(1 for m in members if m["invitation_pending"])
logger.info(
f"Listed {total} team members for vendor {vendor.vendor_code} "
f"(active: {active}, pending: {pending})"
)
return TeamMemberListResponse(
members=members, total=total, active_count=active, pending_invitations=pending
)
@router.post("/invite", response_model=InvitationResponse)
def invite_team_member(
invitation: TeamMemberInvite,
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(require_vendor_owner), # Owner only
):
"""
Invite a new team member to the vendor.
**Required:** Vendor owner role
**Process:**
1. Create user account (if doesn't exist)
2. Create VendorUser with invitation token
3. Send invitation email
**Request Body:**
- `email`: Email address of invitee
- `first_name`, `last_name`: Optional names
- `role_name`: Preset role (manager, staff, support, viewer, marketing)
- `role_id`: Use existing role (alternative to role_name)
- `custom_permissions`: Override role permissions (requires role_name)
**Returns:**
- Invitation details
- Confirmation of email sent
"""
vendor = request.state.vendor
# Determine role approach
if invitation.role_id:
# Use existing role by ID
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_id=invitation.role_id,
)
elif invitation.role_name:
# Use role name with optional custom permissions
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_name=invitation.role_name,
custom_permissions=invitation.custom_permissions,
)
else:
# Default to Staff role
result = vendor_team_service.invite_team_member(
db=db,
vendor=vendor,
inviter=current_user,
email=invitation.email,
role_name="staff",
)
db.commit()
logger.info(
f"Invitation sent: {invitation.email} to {vendor.vendor_code} "
f"by {current_user.username}"
)
return InvitationResponse(
message="Invitation sent successfully",
email=result["email"],
role=result["role"],
invitation_sent=True,
)
@router.post("/accept-invitation", response_model=InvitationAcceptResponse) # public
def accept_invitation(acceptance: InvitationAccept, db: Session = Depends(get_db)):
"""
Accept a team invitation and activate account.
**No authentication required** - uses invitation token.
**Request Body:**
- `invitation_token`: Token from invitation email
- `password`: New password (min 8 chars, must have upper, lower, digit)
- `first_name`, `last_name`: User's name
**Returns:**
- Confirmation message
- Vendor information
- User information
- Assigned role
"""
result = vendor_team_service.accept_invitation(
db=db,
invitation_token=acceptance.invitation_token,
password=acceptance.password,
first_name=acceptance.first_name,
last_name=acceptance.last_name,
)
db.commit()
logger.info(
f"Invitation accepted: {result['user'].email} "
f"for vendor {result['vendor'].vendor_code}"
)
return InvitationAcceptResponse(
message="Invitation accepted successfully. You can now login.",
vendor={
"id": result["vendor"].id,
"vendor_code": result["vendor"].vendor_code,
"name": result["vendor"].name,
"subdomain": result["vendor"].subdomain,
},
user={
"id": result["user"].id,
"email": result["user"].email,
"username": result["user"].username,
"full_name": result["user"].full_name,
},
role=result["role"],
)
@router.get("/members/{user_id}", response_model=TeamMemberResponse)
def get_team_member(
user_id: int,
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(
require_vendor_permission(VendorPermissions.TEAM_VIEW.value)
),
):
"""
Get details of a specific team member.
**Required Permission:** `team.view`
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db, vendor=vendor, include_inactive=True
)
member = next((m for m in members if m["id"] == user_id), None)
if not member:
from app.exceptions import UserNotFoundException
raise UserNotFoundException(str(user_id))
return TeamMemberResponse(**member)
@router.put("/members/{user_id}", response_model=TeamMemberResponse)
def update_team_member(
user_id: int,
update_data: TeamMemberUpdate,
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(require_vendor_owner), # Owner only
):
"""
Update a team member's role or status.
**Required:** Vendor owner role
**Cannot:**
- Change owner's role
- Remove owner
**Request Body:**
- `role_id`: New role ID (optional)
- `is_active`: Active status (optional)
"""
vendor = request.state.vendor
vendor_user = vendor_team_service.update_member_role(
db=db,
vendor=vendor,
user_id=user_id,
new_role_id=update_data.role_id,
is_active=update_data.is_active,
)
db.commit()
logger.info(
f"Team member updated: {user_id} in {vendor.vendor_code} "
f"by {current_user.username}"
)
# Return updated member details
members = vendor_team_service.get_team_members(db, vendor, include_inactive=True)
member = next((m for m in members if m["id"] == user_id), None)
return TeamMemberResponse(**member)
@router.delete("/members/{user_id}")
def remove_team_member(
user_id: int,
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(require_vendor_owner), # Owner only
):
"""
Remove a team member from the vendor.
**Required:** Vendor owner role
**Cannot remove:**
- Vendor owner
**Action:**
- Soft delete (sets is_active = False)
- Member can be re-invited later
"""
vendor = request.state.vendor
vendor_team_service.remove_team_member(db=db, vendor=vendor, user_id=user_id)
db.commit()
logger.info(
f"Team member removed: {user_id} from {vendor.vendor_code} "
f"by {current_user.username}"
)
return {"message": "Team member removed successfully", "user_id": user_id}
@router.post("/members/bulk-remove", response_model=BulkRemoveResponse)
def bulk_remove_team_members(
bulk_remove: BulkRemoveRequest,
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(require_vendor_owner),
):
"""
Remove multiple team members at once.
**Required:** Vendor owner role
"""
vendor = request.state.vendor
success_count = 0
failed_count = 0
errors = []
for user_id in bulk_remove.user_ids:
try:
vendor_team_service.remove_team_member(
db=db, vendor=vendor, user_id=user_id
)
success_count += 1
except Exception as e:
failed_count += 1
errors.append({"user_id": user_id, "error": str(e)})
db.commit()
logger.info(
f"Bulk remove completed: {success_count} removed, {failed_count} failed "
f"in {vendor.vendor_code}"
)
return BulkRemoveResponse(
success_count=success_count, failed_count=failed_count, errors=errors
)
# ============================================================================
# Role Management Routes
# ============================================================================
@router.get("/roles", response_model=RoleListResponse)
def list_roles(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(
require_vendor_permission(VendorPermissions.TEAM_VIEW.value)
),
):
"""
Get all available roles for the vendor.
**Required Permission:** `team.view`
**Returns:**
- List of roles with permissions
- Includes both preset and custom roles
"""
vendor = request.state.vendor
roles = vendor_team_service.get_vendor_roles(db=db, vendor_id=vendor.id)
db.commit() # Commit in case default roles were created
return RoleListResponse(roles=roles, total=len(roles))
# ============================================================================
# Permission Routes
# ============================================================================
@router.get("/me/permissions", response_model=UserPermissionsResponse)
def get_my_permissions(
request: Request,
permissions: list[str] = Depends(get_user_permissions),
current_user: UserContext = Depends(get_current_vendor_api),
):
"""
Get current user's permissions in this vendor.
**Use this endpoint to:**
- Determine what UI elements to show/hide
- Check permissions before making API calls
- Display user's role and capabilities
**Returns:**
- Complete list of permissions
- Whether user is owner
- Role name (if team member)
Requires Authorization header (API endpoint).
"""
vendor = request.state.vendor
is_owner = current_user.is_owner_of(vendor.id)
role_name = current_user.get_vendor_role(vendor.id)
return UserPermissionsResponse(
permissions=permissions,
permission_count=len(permissions),
is_owner=is_owner,
role_name=role_name,
)
# ============================================================================
# Statistics Routes
# ============================================================================
@router.get("/statistics", response_model=TeamStatistics)
def get_team_statistics(
request: Request,
db: Session = Depends(get_db),
current_user: UserContext = Depends(
require_vendor_permission(VendorPermissions.TEAM_VIEW.value)
),
):
"""
Get team statistics for the vendor.
**Required Permission:** `team.view`
**Returns:**
- Total members
- Active/inactive breakdown
- Pending invitations
- Owner count
- Role distribution
"""
vendor = request.state.vendor
members = vendor_team_service.get_team_members(
db=db, vendor=vendor, include_inactive=True
)
# Calculate statistics
total = len(members)
active = sum(1 for m in members if m["is_active"])
inactive = total - active
pending = sum(1 for m in members if m["invitation_pending"])
owners = sum(1 for m in members if m["is_owner"])
team_members = total - owners
# Role breakdown
roles_breakdown = {}
for member in members:
role = member["role_name"]
roles_breakdown[role] = roles_breakdown.get(role, 0) + 1
return TeamStatistics(
total_members=total,
active_members=active,
inactive_members=inactive,
pending_invitations=pending,
owners=owners,
team_members=team_members,
roles_breakdown=roles_breakdown,
)