refactor: move letzshop endpoints to marketplace module and add vendor service tests

Move letzshop-related functionality from tenancy to marketplace module:
- Move admin letzshop routes to marketplace/routes/api/admin_letzshop.py
- Move letzshop schemas to marketplace/schemas/letzshop.py
- Remove letzshop code from tenancy module (admin_vendors, vendor_service)
- Update model exports and imports

Add comprehensive unit tests for vendor services:
- test_company_service.py: Company management operations
- test_platform_service.py: Platform management operations
- test_vendor_domain_service.py: Vendor domain operations
- test_vendor_team_service.py: Vendor team management

Update module definitions:
- billing, messaging, payments: Minor definition updates

Add architecture proposals documentation:
- Module dependency redesign session notes
- Decouple modules implementation plan
- Module decoupling proposal

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-04 19:25:00 +01:00
parent 37942ae02b
commit 0583dd2cc4
29 changed files with 3643 additions and 650 deletions

View File

@@ -77,11 +77,12 @@ billing_module = ModuleDefinition(
code="billing", code="billing",
name="Billing & Subscriptions", name="Billing & Subscriptions",
description=( description=(
"Platform subscription management, vendor billing, and invoice history. " "Core subscription management, tier limits, vendor billing, and invoice history. "
"Provides tier-based feature gating used throughout the platform. "
"Uses the payments module for actual payment processing." "Uses the payments module for actual payment processing."
), ),
version="1.0.0", version="1.0.0",
requires=["payments"], # Depends on payments module for payment processing requires=["payments"], # Depends on payments module (also core) for payment processing
features=[ features=[
"subscription_management", # Manage subscription tiers "subscription_management", # Manage subscription tiers
"billing_history", # View invoices and payment history "billing_history", # View invoices and payment history
@@ -200,7 +201,7 @@ billing_module = ModuleDefinition(
), ),
], ],
}, },
is_core=False, # Billing can be disabled (e.g., internal platforms) is_core=True, # Core module - tier limits and subscription management are fundamental
# Context providers for dynamic page context # Context providers for dynamic page context
context_providers={ context_providers={
FrontendType.PLATFORM: _get_platform_context, FrontendType.PLATFORM: _get_platform_context,

View File

@@ -18,9 +18,16 @@ Usage:
# These imports are NOT re-exported, just ensure models are registered with SQLAlchemy # These imports are NOT re-exported, just ensure models are registered with SQLAlchemy
# before the marketplace models are loaded. # before the marketplace models are loaded.
# #
# Relationship being resolved: # Relationships being resolved:
# - LetzshopFulfillmentQueue.order -> "Order" (in orders module) # - LetzshopFulfillmentQueue.order -> "Order" (in orders module)
# - MarketplaceImportJob.vendor -> "Vendor" (in tenancy module)
# - MarketplaceImportJob.user -> "User" (in tenancy module)
#
# NOTE: This module owns the relationships to tenancy models (User, Vendor).
# Core models should NOT have back-references to optional module models.
from app.modules.orders.models import Order # noqa: F401 from app.modules.orders.models import Order # noqa: F401
from app.modules.tenancy.models.user import User # noqa: F401
from app.modules.tenancy.models.vendor import Vendor # noqa: F401
from app.modules.marketplace.models.marketplace_product import ( from app.modules.marketplace.models.marketplace_product import (
MarketplaceProduct, MarketplaceProduct,

View File

@@ -95,7 +95,9 @@ class MarketplaceImportJob(Base, TimestampMixin):
completed_at = Column(DateTime(timezone=True)) completed_at = Column(DateTime(timezone=True))
# Relationships # Relationships
vendor = relationship("Vendor", back_populates="marketplace_import_jobs") # NOTE: No back_populates - optional modules own relationships to core models
# Core models (User, Vendor) don't have back-references to optional modules
vendor = relationship("Vendor")
user = relationship("User", foreign_keys=[user_id]) user = relationship("User", foreign_keys=[user_id])
errors = relationship( errors = relationship(
"MarketplaceImportError", "MarketplaceImportError",

View File

@@ -45,6 +45,8 @@ from app.modules.marketplace.schemas import (
LetzshopCredentialsCreate, LetzshopCredentialsCreate,
LetzshopCredentialsResponse, LetzshopCredentialsResponse,
LetzshopCredentialsUpdate, LetzshopCredentialsUpdate,
LetzshopExportRequest,
LetzshopExportResponse,
LetzshopHistoricalImportJobResponse, LetzshopHistoricalImportJobResponse,
LetzshopHistoricalImportStartResponse, LetzshopHistoricalImportStartResponse,
LetzshopJobItem, LetzshopJobItem,
@@ -1520,3 +1522,197 @@ def create_vendor_from_letzshop(
except ValueError as e: except ValueError as e:
raise ValidationException(str(e)) raise ValidationException(str(e))
# ============================================================================
# Product Export
# ============================================================================
@admin_letzshop_router.get("/vendors/{vendor_id}/export")
def export_vendor_products_letzshop(
vendor_id: int = Path(..., description="Vendor ID"),
language: str = Query(
"en", description="Language for title/description (en, fr, de)"
),
include_inactive: bool = Query(False, description="Include inactive products"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Export vendor products in Letzshop CSV format (Admin only).
Generates a Google Shopping compatible CSV file for Letzshop marketplace.
The file uses tab-separated values and includes all required Letzshop fields.
**Supported languages:** en, fr, de
**CSV Format:**
- Delimiter: Tab (\\t)
- Encoding: UTF-8
- Fields: id, title, description, price, availability, image_link, etc.
Returns:
CSV file as attachment (vendor_code_letzshop_export.csv)
"""
from fastapi.responses import Response
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
order_service = get_order_service(db)
try:
vendor = order_service.get_vendor_or_raise(vendor_id)
except VendorNotFoundError:
raise ResourceNotFoundException("Vendor", str(vendor_id))
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor.id,
language=language,
include_inactive=include_inactive,
)
filename = f"{vendor.vendor_code.lower()}_letzshop_export.csv"
return Response(
content=csv_content,
media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
},
)
@admin_letzshop_router.post(
"/vendors/{vendor_id}/export",
response_model=LetzshopExportResponse,
)
def export_vendor_products_letzshop_to_folder(
vendor_id: int = Path(..., description="Vendor ID"),
request: LetzshopExportRequest = None,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Export vendor products to Letzshop pickup folder (Admin only).
Generates CSV files for all languages (FR, DE, EN) and places them in a folder
that Letzshop scheduler can fetch from. This is the preferred method for
automated product sync.
**Behavior:**
- When Celery is enabled: Queues export as background task, returns immediately
- When Celery is disabled: Runs synchronously and returns file paths
- Creates CSV files for each language (fr, de, en)
- Places files in: exports/letzshop/{vendor_code}/
- Filename format: {vendor_code}_products_{language}.csv
Returns:
JSON with export status and file paths (or task_id if async)
"""
import os
from datetime import UTC, datetime
from pathlib import Path as FilePath
from app.core.config import settings
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
order_service = get_order_service(db)
try:
vendor = order_service.get_vendor_or_raise(vendor_id)
except VendorNotFoundError:
raise ResourceNotFoundException("Vendor", str(vendor_id))
include_inactive = request.include_inactive if request else False
# If Celery is enabled, dispatch as async task
if settings.use_celery:
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_product_export(
vendor_id=vendor.id,
triggered_by=f"admin:{current_admin.id}",
include_inactive=include_inactive,
)
return {
"success": True,
"message": f"Export task queued for vendor {vendor.vendor_code}. Check Flower for status.",
"vendor_code": vendor.vendor_code,
"export_directory": f"exports/letzshop/{vendor.vendor_code.lower()}",
"files": [],
"celery_task_id": celery_task_id,
"is_async": True,
}
# Synchronous export (when Celery is disabled)
started_at = datetime.now(UTC)
# Create export directory
export_dir = FilePath(f"exports/letzshop/{vendor.vendor_code.lower()}")
export_dir.mkdir(parents=True, exist_ok=True)
exported_files = []
languages = ["fr", "de", "en"]
total_records = 0
failed_count = 0
for lang in languages:
try:
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor.id,
language=lang,
include_inactive=include_inactive,
)
filename = f"{vendor.vendor_code.lower()}_products_{lang}.csv"
filepath = export_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
f.write(csv_content)
# Count lines (minus header)
line_count = csv_content.count("\n")
if line_count > 0:
total_records = max(total_records, line_count - 1)
exported_files.append({
"language": lang,
"filename": filename,
"path": str(filepath),
"size_bytes": os.path.getsize(filepath),
})
except Exception as e:
failed_count += 1
exported_files.append({
"language": lang,
"error": str(e),
})
# Log the export operation via service
completed_at = datetime.now(UTC)
letzshop_export_service.log_export(
db=db,
vendor_id=vendor.id,
started_at=started_at,
completed_at=completed_at,
files_processed=len(languages),
files_succeeded=len(languages) - failed_count,
files_failed=failed_count,
products_exported=total_records,
triggered_by=f"admin:{current_admin.id}",
error_details={"files": exported_files} if failed_count > 0 else None,
)
db.commit()
return {
"success": True,
"message": f"Exported {len([f for f in exported_files if 'error' not in f])} language(s) to {export_dir}",
"vendor_code": vendor.vendor_code,
"export_directory": str(export_dir),
"files": exported_files,
"is_async": False,
}

View File

@@ -85,6 +85,10 @@ from app.modules.marketplace.schemas.letzshop import (
LetzshopCachedVendorDetailResponse, LetzshopCachedVendorDetailResponse,
LetzshopVendorDirectorySyncResponse, LetzshopVendorDirectorySyncResponse,
LetzshopCreateVendorFromCacheResponse, LetzshopCreateVendorFromCacheResponse,
# Product Export
LetzshopExportRequest,
LetzshopExportFileInfo,
LetzshopExportResponse,
) )
from app.modules.marketplace.schemas.onboarding import ( from app.modules.marketplace.schemas.onboarding import (
# Step status # Step status
@@ -184,6 +188,10 @@ __all__ = [
"LetzshopCachedVendorDetailResponse", "LetzshopCachedVendorDetailResponse",
"LetzshopVendorDirectorySyncResponse", "LetzshopVendorDirectorySyncResponse",
"LetzshopCreateVendorFromCacheResponse", "LetzshopCreateVendorFromCacheResponse",
# Letzshop - Product Export
"LetzshopExportRequest",
"LetzshopExportFileInfo",
"LetzshopExportResponse",
# Onboarding - Step status # Onboarding - Step status
"StepStatus", "StepStatus",
"CompanyProfileStepStatus", "CompanyProfileStepStatus",

View File

@@ -624,3 +624,41 @@ class LetzshopCreateVendorFromCacheResponse(BaseModel):
message: str message: str
vendor: dict[str, Any] | None = None vendor: dict[str, Any] | None = None
letzshop_vendor_slug: str letzshop_vendor_slug: str
# ============================================================================
# Product Export Schemas
# ============================================================================
class LetzshopExportRequest(BaseModel):
"""Request body for Letzshop export to pickup folder."""
include_inactive: bool = Field(
default=False,
description="Include inactive products in export"
)
class LetzshopExportFileInfo(BaseModel):
"""Info about an exported file."""
language: str
filename: str | None = None
path: str | None = None
size_bytes: int | None = None
error: str | None = None
class LetzshopExportResponse(BaseModel):
"""Response from Letzshop export to folder."""
success: bool
message: str
vendor_code: str
export_directory: str
files: list[LetzshopExportFileInfo]
celery_task_id: str | None = None # Set when using Celery async export
is_async: bool = Field(default=False, serialization_alias="async") # True when queued via Celery
model_config = ConfigDict(populate_by_name=True)

View File

@@ -701,7 +701,7 @@ function adminMarketplaceLetzshop() {
this.successMessage = ''; this.successMessage = '';
try { try {
const response = await apiClient.post(`/admin/vendors/${this.selectedVendor.id}/export/letzshop`, { const response = await apiClient.post(`/admin/letzshop/vendors/${this.selectedVendor.id}/export`, {
include_inactive: this.exportIncludeInactive include_inactive: this.exportIncludeInactive
}); });

View File

@@ -28,7 +28,10 @@ def _get_vendor_router():
messaging_module = ModuleDefinition( messaging_module = ModuleDefinition(
code="messaging", code="messaging",
name="Messaging & Notifications", name="Messaging & Notifications",
description="Internal messages, customer communication, and notifications.", description=(
"Core email and notification system for user registration, password resets, "
"team invitations, and system notifications. Required for basic platform operations."
),
version="1.0.0", version="1.0.0",
features=[ features=[
"customer_messaging", # Customer communication "customer_messaging", # Customer communication
@@ -157,7 +160,7 @@ messaging_module = ModuleDefinition(
), ),
], ],
}, },
is_core=False, is_core=True, # Core module - email/notifications required for registration, password reset, etc.
# ========================================================================= # =========================================================================
# Self-Contained Module Configuration # Self-Contained Module Configuration
# ========================================================================= # =========================================================================

View File

@@ -38,8 +38,9 @@ payments_module = ModuleDefinition(
code="payments", code="payments",
name="Payment Gateways", name="Payment Gateways",
description=( description=(
"Payment gateway integrations for Stripe, PayPal, and bank transfers. " "Core payment gateway integrations for Stripe, PayPal, and bank transfers. "
"Provides payment processing, refunds, and payment method management." "Provides payment processing, refunds, and payment method management. "
"Required by billing module for subscription payments."
), ),
version="1.0.0", version="1.0.0",
features=[ features=[
@@ -80,7 +81,7 @@ payments_module = ModuleDefinition(
"payment-methods", # Manage stored payment methods "payment-methods", # Manage stored payment methods
], ],
}, },
is_core=False, is_core=True, # Core module - required for billing and subscription management
is_internal=False, is_internal=False,
# ========================================================================= # =========================================================================
# Self-Contained Module Configuration # Self-Contained Module Configuration

View File

@@ -14,12 +14,12 @@ This is the canonical location for tenancy module models including:
# These imports are NOT re-exported, just ensure models are registered with SQLAlchemy # These imports are NOT re-exported, just ensure models are registered with SQLAlchemy
# before the tenancy models are loaded. # before the tenancy models are loaded.
# #
# Relationship chain being resolved: # Relationship being resolved:
# - Platform.admin_menu_configs -> "AdminMenuConfig" (in core module) # - Platform.admin_menu_configs -> "AdminMenuConfig" (in core module)
# - User.marketplace_import_jobs -> "MarketplaceImportJob" (in marketplace module) #
# - Vendor.marketplace_import_jobs -> "MarketplaceImportJob" (in marketplace module) # NOTE: MarketplaceImportJob relationships have been moved to the marketplace module.
# Optional modules own their relationships to core models, not vice versa.
from app.modules.core.models import AdminMenuConfig # noqa: F401 from app.modules.core.models import AdminMenuConfig # noqa: F401
from app.modules.marketplace.models.marketplace_import_job import MarketplaceImportJob # noqa: F401
from app.modules.tenancy.models.admin import ( from app.modules.tenancy.models.admin import (
AdminAuditLog, AdminAuditLog,

View File

@@ -56,9 +56,8 @@ class User(Base, TimestampMixin):
preferred_language = Column(String(5), nullable=True) preferred_language = Column(String(5), nullable=True)
# Relationships # Relationships
marketplace_import_jobs = relationship( # NOTE: marketplace_import_jobs relationship removed - owned by marketplace module
"MarketplaceImportJob", back_populates="user" # Use: MarketplaceImportJob.query.filter_by(user_id=user.id) instead
)
owned_companies = relationship("Company", back_populates="owner") owned_companies = relationship("Company", back_populates="owner")
vendor_memberships = relationship( vendor_memberships = relationship(
"VendorUser", foreign_keys="[VendorUser.user_id]", back_populates="user" "VendorUser", foreign_keys="[VendorUser.user_id]", back_populates="user"

View File

@@ -3,7 +3,9 @@
Vendor model representing entities that sell products or services. Vendor model representing entities that sell products or services.
This module defines the Vendor model along with its relationships to This module defines the Vendor model along with its relationships to
other models such as User (owner), Product, Customer, Order, and MarketplaceImportJob. other models such as User (owner), Product, Customer, and Order.
Note: MarketplaceImportJob relationships are owned by the marketplace module.
""" """
import enum import enum
@@ -143,9 +145,8 @@ class Vendor(Base, TimestampMixin):
orders = relationship( orders = relationship(
"Order", back_populates="vendor" "Order", back_populates="vendor"
) # Relationship with Order model for orders placed by this vendor ) # Relationship with Order model for orders placed by this vendor
marketplace_import_jobs = relationship( # NOTE: marketplace_import_jobs relationship removed - owned by marketplace module
"MarketplaceImportJob", back_populates="vendor" # Use: MarketplaceImportJob.query.filter_by(vendor_id=vendor.id) instead
) # Relationship with MarketplaceImportJob model for import jobs related to this vendor
# Letzshop integration credentials (one-to-one) # Letzshop integration credentials (one-to-one)
letzshop_credentials = relationship( letzshop_credentials = relationship(

View File

@@ -17,17 +17,14 @@ from app.api.deps import get_current_admin_api
from app.core.database import get_db from app.core.database import get_db
from app.modules.tenancy.exceptions import ConfirmationRequiredException from app.modules.tenancy.exceptions import ConfirmationRequiredException
from app.modules.tenancy.services.admin_service import admin_service from app.modules.tenancy.services.admin_service import admin_service
from app.modules.analytics.services.stats_service import stats_service
from app.modules.tenancy.services.vendor_service import vendor_service from app.modules.tenancy.services.vendor_service import vendor_service
from models.schema.auth import UserContext from models.schema.auth import UserContext
from app.modules.analytics.schemas import VendorStatsResponse
from app.modules.tenancy.schemas.vendor import ( from app.modules.tenancy.schemas.vendor import (
LetzshopExportRequest,
LetzshopExportResponse,
VendorCreate, VendorCreate,
VendorCreateResponse, VendorCreateResponse,
VendorDetailResponse, VendorDetailResponse,
VendorListResponse, VendorListResponse,
VendorStatsResponse,
VendorUpdate, VendorUpdate,
) )
@@ -109,14 +106,22 @@ def get_vendor_statistics_endpoint(
current_admin: UserContext = Depends(get_current_admin_api), current_admin: UserContext = Depends(get_current_admin_api),
): ):
"""Get vendor statistics for admin dashboard (Admin only).""" """Get vendor statistics for admin dashboard (Admin only)."""
stats = stats_service.get_vendor_statistics(db) from app.modules.tenancy.models import Vendor
# Query vendor statistics directly to avoid analytics module dependency
total = db.query(Vendor).count()
verified = db.query(Vendor).filter(Vendor.is_verified == True).count()
active = db.query(Vendor).filter(Vendor.is_active == True).count()
inactive = total - active
pending = db.query(Vendor).filter(
Vendor.is_active == True, Vendor.is_verified == False
).count()
# Use schema-compatible keys (with fallback to legacy keys)
return VendorStatsResponse( return VendorStatsResponse(
total=stats.get("total", stats.get("total_vendors", 0)), total=total,
verified=stats.get("verified", stats.get("verified_vendors", 0)), verified=verified,
pending=stats.get("pending", stats.get("pending_vendors", 0)), pending=pending,
inactive=stats.get("inactive", stats.get("inactive_vendors", 0)), inactive=inactive,
) )
@@ -310,183 +315,3 @@ def delete_vendor(
message = admin_service.delete_vendor(db, vendor.id) message = admin_service.delete_vendor(db, vendor.id)
db.commit() db.commit()
return {"message": message} return {"message": message}
# ============================================================================
# LETZSHOP EXPORT
# ============================================================================
@admin_vendors_router.get("/{vendor_identifier}/export/letzshop")
def export_vendor_products_letzshop(
vendor_identifier: str = Path(..., description="Vendor ID or vendor_code"),
language: str = Query(
"en", description="Language for title/description (en, fr, de)"
),
include_inactive: bool = Query(False, description="Include inactive products"),
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Export vendor products in Letzshop CSV format (Admin only).
Generates a Google Shopping compatible CSV file for Letzshop marketplace.
The file uses tab-separated values and includes all required Letzshop fields.
**Supported languages:** en, fr, de
**CSV Format:**
- Delimiter: Tab (\\t)
- Encoding: UTF-8
- Fields: id, title, description, price, availability, image_link, etc.
Returns:
CSV file as attachment (vendor_code_letzshop_export.csv)
"""
from fastapi.responses import Response
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
vendor = vendor_service.get_vendor_by_identifier(db, vendor_identifier)
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor.id,
language=language,
include_inactive=include_inactive,
)
filename = f"{vendor.vendor_code.lower()}_letzshop_export.csv"
return Response(
content=csv_content,
media_type="text/csv; charset=utf-8",
headers={
"Content-Disposition": f'attachment; filename="{filename}"',
},
)
@admin_vendors_router.post("/{vendor_identifier}/export/letzshop", response_model=LetzshopExportResponse)
def export_vendor_products_letzshop_to_folder(
vendor_identifier: str = Path(..., description="Vendor ID or vendor_code"),
request: LetzshopExportRequest = None,
db: Session = Depends(get_db),
current_admin: UserContext = Depends(get_current_admin_api),
):
"""
Export vendor products to Letzshop pickup folder (Admin only).
Generates CSV files for all languages (FR, DE, EN) and places them in a folder
that Letzshop scheduler can fetch from. This is the preferred method for
automated product sync.
**Behavior:**
- When Celery is enabled: Queues export as background task, returns immediately
- When Celery is disabled: Runs synchronously and returns file paths
- Creates CSV files for each language (fr, de, en)
- Places files in: exports/letzshop/{vendor_code}/
- Filename format: {vendor_code}_products_{language}.csv
Returns:
JSON with export status and file paths (or task_id if async)
"""
import os
from datetime import UTC, datetime
from pathlib import Path as FilePath
from app.core.config import settings
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
vendor = vendor_service.get_vendor_by_identifier(db, vendor_identifier)
include_inactive = request.include_inactive if request else False
# If Celery is enabled, dispatch as async task
if settings.use_celery:
from app.tasks.dispatcher import task_dispatcher
celery_task_id = task_dispatcher.dispatch_product_export(
vendor_id=vendor.id,
triggered_by=f"admin:{current_admin.id}",
include_inactive=include_inactive,
)
return {
"success": True,
"message": f"Export task queued for vendor {vendor.vendor_code}. Check Flower for status.",
"vendor_code": vendor.vendor_code,
"export_directory": f"exports/letzshop/{vendor.vendor_code.lower()}",
"files": [],
"celery_task_id": celery_task_id,
"is_async": True,
}
# Synchronous export (when Celery is disabled)
started_at = datetime.now(UTC)
# Create export directory
export_dir = FilePath(f"exports/letzshop/{vendor.vendor_code.lower()}")
export_dir.mkdir(parents=True, exist_ok=True)
exported_files = []
languages = ["fr", "de", "en"]
total_records = 0
failed_count = 0
for lang in languages:
try:
csv_content = letzshop_export_service.export_vendor_products(
db=db,
vendor_id=vendor.id,
language=lang,
include_inactive=include_inactive,
)
filename = f"{vendor.vendor_code.lower()}_products_{lang}.csv"
filepath = export_dir / filename
with open(filepath, "w", encoding="utf-8") as f:
f.write(csv_content)
# Count lines (minus header)
line_count = csv_content.count("\n")
if line_count > 0:
total_records = max(total_records, line_count - 1)
exported_files.append({
"language": lang,
"filename": filename,
"path": str(filepath),
"size_bytes": os.path.getsize(filepath),
})
except Exception as e:
failed_count += 1
exported_files.append({
"language": lang,
"error": str(e),
})
# Log the export operation via service
completed_at = datetime.now(UTC)
letzshop_export_service.log_export(
db=db,
vendor_id=vendor.id,
started_at=started_at,
completed_at=completed_at,
files_processed=len(languages),
files_succeeded=len(languages) - failed_count,
files_failed=failed_count,
products_exported=total_records,
triggered_by=f"admin:{current_admin.id}",
error_details={"files": exported_files} if failed_count > 0 else None,
)
db.commit()
return {
"success": True,
"message": f"Exported {len([f for f in exported_files if 'error' not in f])} language(s) to {export_dir}",
"vendor_code": vendor.vendor_code,
"export_directory": str(export_dir),
"files": exported_files,
"is_async": False,
}

View File

@@ -21,9 +21,6 @@ from app.modules.tenancy.schemas.company import (
# Vendor schemas # Vendor schemas
from app.modules.tenancy.schemas.vendor import ( from app.modules.tenancy.schemas.vendor import (
LetzshopExportFileInfo,
LetzshopExportRequest,
LetzshopExportResponse,
VendorCreate, VendorCreate,
VendorCreateResponse, VendorCreateResponse,
VendorDetailResponse, VendorDetailResponse,
@@ -125,9 +122,6 @@ __all__ = [
"CompanyTransferOwnershipResponse", "CompanyTransferOwnershipResponse",
"CompanyUpdate", "CompanyUpdate",
# Vendor # Vendor
"LetzshopExportFileInfo",
"LetzshopExportRequest",
"LetzshopExportResponse",
"VendorCreate", "VendorCreate",
"VendorCreateResponse", "VendorCreateResponse",
"VendorDetailResponse", "VendorDetailResponse",

View File

@@ -312,40 +312,21 @@ class VendorSummary(BaseModel):
# Ownership transfer is now handled at the Company level. # Ownership transfer is now handled at the Company level.
# See models/schema/company.py for CompanyTransferOwnership and CompanyTransferOwnershipResponse. # See models/schema/company.py for CompanyTransferOwnership and CompanyTransferOwnershipResponse.
# NOTE: Letzshop export schemas have been moved to app.modules.marketplace.schemas.letzshop
# ============================================================================ # See LetzshopExportRequest, LetzshopExportFileInfo, LetzshopExportResponse
# LETZSHOP EXPORT SCHEMAS
# ============================================================================
class LetzshopExportRequest(BaseModel): # Re-export VendorStatsResponse from core for convenience
"""Request body for Letzshop export to pickup folder.""" # This allows tenancy routes to use this schema without importing from core directly
from app.modules.core.schemas.dashboard import VendorStatsResponse
include_inactive: bool = Field( __all__ = [
default=False, "VendorCreate",
description="Include inactive products in export" "VendorUpdate",
) "VendorResponse",
"VendorDetailResponse",
"VendorCreateResponse",
class LetzshopExportFileInfo(BaseModel): "VendorListResponse",
"""Info about an exported file.""" "VendorSummary",
"VendorStatsResponse",
language: str ]
filename: str | None = None
path: str | None = None
size_bytes: int | None = None
error: str | None = None
class LetzshopExportResponse(BaseModel):
"""Response from Letzshop export to folder."""
success: bool
message: str
vendor_code: str
export_directory: str
files: list[LetzshopExportFileInfo]
celery_task_id: str | None = None # Set when using Celery async export
is_async: bool = Field(default=False, serialization_alias="async") # True when queued via Celery
model_config = {"populate_by_name": True}

View File

@@ -1,13 +1,14 @@
# app/modules/tenancy/services/admin_service.py # app/modules/tenancy/services/admin_service.py
""" """
Admin service for managing users, vendors, and import jobs. Admin service for managing users and vendors.
This module provides classes and functions for: This module provides classes and functions for:
- User management and status control - User management and status control
- Vendor creation with owner user generation - Vendor creation with owner user generation
- Vendor verification and activation - Vendor verification and activation
- Marketplace import job monitoring
- Platform statistics - Platform statistics
Note: Marketplace import job monitoring has been moved to the marketplace module.
""" """
import logging import logging
@@ -33,11 +34,9 @@ from app.modules.tenancy.exceptions import (
) )
from middleware.auth import AuthManager from middleware.auth import AuthManager
from app.modules.tenancy.models import Company from app.modules.tenancy.models import Company
from app.modules.marketplace.models import MarketplaceImportJob
from app.modules.tenancy.models import Platform from app.modules.tenancy.models import Platform
from app.modules.tenancy.models import User from app.modules.tenancy.models import User
from app.modules.tenancy.models import Role, Vendor from app.modules.tenancy.models import Role, Vendor
from app.modules.marketplace.schemas import MarketplaceImportJobResponse
from app.modules.tenancy.schemas.vendor import VendorCreate from app.modules.tenancy.schemas.vendor import VendorCreate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -703,48 +702,8 @@ class AdminService:
# NOTE: Vendor ownership transfer is now handled at the Company level. # NOTE: Vendor ownership transfer is now handled at the Company level.
# Use company_service.transfer_ownership() instead. # Use company_service.transfer_ownership() instead.
# ============================================================================ # NOTE: Marketplace import job operations have been moved to the marketplace module.
# MARKETPLACE IMPORT JOBS # Use app.modules.marketplace routes for import job management.
# ============================================================================
def get_marketplace_import_jobs(
self,
db: Session,
marketplace: str | None = None,
vendor_name: str | None = None,
status: str | None = None,
skip: int = 0,
limit: int = 100,
) -> list[MarketplaceImportJobResponse]:
"""Get filtered and paginated marketplace import jobs."""
try:
query = db.query(MarketplaceImportJob)
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if vendor_name:
query = query.filter(
MarketplaceImportJob.vendor_name.ilike(f"%{vendor_name}%")
)
if status:
query = query.filter(MarketplaceImportJob.status == status)
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return [self._convert_job_to_response(job) for job in jobs]
except Exception as e:
logger.error(f"Failed to retrieve marketplace import jobs: {str(e)}")
raise AdminOperationException(
operation="get_marketplace_import_jobs", reason="Database query failed"
)
# ============================================================================ # ============================================================================
# STATISTICS # STATISTICS
@@ -773,30 +732,7 @@ class AdminService:
logger.error(f"Failed to get recent vendors: {str(e)}") logger.error(f"Failed to get recent vendors: {str(e)}")
return [] return []
def get_recent_import_jobs(self, db: Session, limit: int = 10) -> list[dict]: # NOTE: get_recent_import_jobs has been moved to the marketplace module
"""Get recent marketplace import jobs."""
try:
jobs = (
db.query(MarketplaceImportJob)
.order_by(MarketplaceImportJob.created_at.desc())
.limit(limit)
.all()
)
return [
{
"id": j.id,
"marketplace": j.marketplace,
"vendor_name": j.vendor_name,
"status": j.status,
"total_processed": j.total_processed or 0,
"created_at": j.created_at,
}
for j in jobs
]
except Exception as e:
logger.error(f"Failed to get recent import jobs: {str(e)}")
return []
# ============================================================================ # ============================================================================
# PRIVATE HELPER METHODS # PRIVATE HELPER METHODS
@@ -868,28 +804,6 @@ class AdminService:
) )
db.add(role) db.add(role)
def _convert_job_to_response(
self, job: MarketplaceImportJob
) -> MarketplaceImportJobResponse:
"""Convert database model to response schema."""
return MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
source_url=job.source_url,
vendor_id=job.vendor.id if job.vendor else None,
vendor_code=job.vendor.vendor_code if job.vendor else None,
vendor_name=job.vendor.name if job.vendor else None,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
)
# Create service instance # Create service instance
admin_service = AdminService() admin_service = AdminService()

View File

@@ -1,12 +1,13 @@
# app/modules/tenancy/services/vendor_service.py # app/modules/tenancy/services/vendor_service.py
""" """
Vendor service for managing vendor operations and product catalog. Vendor service for managing vendor operations.
This module provides classes and functions for: This module provides classes and functions for:
- Vendor creation and management - Vendor creation and management
- Vendor access control and validation - Vendor access control and validation
- Vendor product catalog operations
- Vendor filtering and search - Vendor filtering and search
Note: Product catalog operations have been moved to app.modules.catalog.services.
""" """
import logging import logging
@@ -15,19 +16,14 @@ from sqlalchemy import func
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.exceptions import ValidationException from app.exceptions import ValidationException
from app.modules.catalog.exceptions import ProductAlreadyExistsException
from app.modules.marketplace.exceptions import MarketplaceProductNotFoundException
from app.modules.tenancy.exceptions import ( from app.modules.tenancy.exceptions import (
InvalidVendorDataException, InvalidVendorDataException,
UnauthorizedVendorAccessException, UnauthorizedVendorAccessException,
VendorAlreadyExistsException, VendorAlreadyExistsException,
VendorNotFoundException, VendorNotFoundException,
) )
from app.modules.marketplace.models import MarketplaceProduct
from app.modules.catalog.models import Product
from app.modules.tenancy.models import User from app.modules.tenancy.models import User
from app.modules.tenancy.models import Vendor from app.modules.tenancy.models import Vendor
from app.modules.catalog.schemas import ProductCreate
from app.modules.tenancy.schemas.vendor import VendorCreate from app.modules.tenancy.schemas.vendor import VendorCreate
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -443,110 +439,10 @@ class VendorService:
logger.info(f"Vendor {vendor.vendor_code} set to {status}") logger.info(f"Vendor {vendor.vendor_code} set to {status}")
return vendor, f"Vendor {vendor.vendor_code} is now {status}" return vendor, f"Vendor {vendor.vendor_code} is now {status}"
def add_product_to_catalog( # NOTE: Product catalog operations have been moved to catalog module.
self, db: Session, vendor: Vendor, product: ProductCreate # Use app.modules.catalog.services.product_service instead.
) -> Product: # - add_product_to_catalog -> product_service.create_product
""" # - get_products -> product_service.get_vendor_products
Add existing product to vendor catalog with vendor -specific settings.
Args:
db: Database session
vendor : Vendor to add product to
product: Vendor product data
Returns:
Created Product object
Raises:
MarketplaceProductNotFoundException: If product not found
ProductAlreadyExistsException: If product already in vendor
"""
try:
# Check if product exists
marketplace_product = self._get_product_by_id_or_raise(
db, product.marketplace_product_id
)
# Check if product already in vendor
if self._product_in_catalog(db, vendor.id, marketplace_product.id):
raise ProductAlreadyExistsException(
vendor.vendor_code, product.marketplace_product_id
)
# Create vendor-product association
new_product = Product(
vendor_id=vendor.id,
marketplace_product_id=marketplace_product.id,
**product.model_dump(exclude={"marketplace_product_id"}),
)
db.add(new_product)
db.flush() # Get ID without committing - endpoint handles commit
logger.info(
f"MarketplaceProduct {product.marketplace_product_id} added to vendor {vendor.vendor_code}"
)
return new_product
except (MarketplaceProductNotFoundException, ProductAlreadyExistsException):
raise # Re-raise custom exceptions - endpoint handles rollback
except Exception as e:
logger.error(f"Error adding product to vendor : {str(e)}")
raise ValidationException("Failed to add product to vendor ")
def get_products(
self,
db: Session,
vendor: Vendor,
current_user: User,
skip: int = 0,
limit: int = 100,
active_only: bool = True,
featured_only: bool = False,
) -> tuple[list[Product], int]:
"""
Get products in vendor catalog with filtering.
Args:
db: Database session
vendor : Vendor to get products from
current_user: Current user requesting products
skip: Number of records to skip
limit: Maximum number of records to return
active_only: Filter for active products only
featured_only: Filter for featured products only
Returns:
Tuple of (products_list, total_count)
Raises:
UnauthorizedVendorAccessException: If vendor access denied
"""
try:
# Check access permissions
if not self._can_access_vendor(vendor, current_user):
raise UnauthorizedVendorAccessException(
vendor.vendor_code, current_user.id
)
# Query vendor products
query = db.query(Product).filter(Product.vendor_id == vendor.id)
if active_only:
query = query.filter(Product.is_active == True)
if featured_only:
query = query.filter(Product.is_featured == True)
total = query.count()
products = query.offset(skip).limit(limit).all()
return products, total
except UnauthorizedVendorAccessException:
raise # Re-raise custom exceptions
except Exception as e:
logger.error(f"Error getting vendor products: {str(e)}")
raise ValidationException("Failed to retrieve vendor products")
# Private helper methods # Private helper methods
def _vendor_code_exists(self, db: Session, vendor_code: str) -> bool: def _vendor_code_exists(self, db: Session, vendor_code: str) -> bool:
@@ -558,33 +454,6 @@ class VendorService:
is not None is not None
) )
def _get_product_by_id_or_raise(
self, db: Session, marketplace_product_id: int
) -> MarketplaceProduct:
"""Get marketplace product by database ID or raise exception."""
product = (
db.query(MarketplaceProduct)
.filter(MarketplaceProduct.id == marketplace_product_id)
.first()
)
if not product:
raise MarketplaceProductNotFoundException(str(marketplace_product_id))
return product
def _product_in_catalog(
self, db: Session, vendor_id: int, marketplace_product_id: int
) -> bool:
"""Check if product is already in vendor."""
return (
db.query(Product)
.filter(
Product.vendor_id == vendor_id,
Product.marketplace_product_id == marketplace_product_id,
)
.first()
is not None
)
def _can_access_vendor(self, vendor: Vendor, user: User) -> bool: def _can_access_vendor(self, vendor: Vendor, user: User) -> bool:
"""Check if user can access vendor.""" """Check if user can access vendor."""
# Admins can always access # Admins can always access

View File

@@ -224,8 +224,8 @@ Configure Letzshop API access:
|----------|--------|-------------| |----------|--------|-------------|
| `/admin/products` | GET | List marketplace products with filters | | `/admin/products` | GET | List marketplace products with filters |
| `/admin/products/stats` | GET | Get product statistics | | `/admin/products/stats` | GET | Get product statistics |
| `/admin/vendors/{id}/export/letzshop` | GET | Download CSV export | | `/admin/letzshop/vendors/{id}/export` | GET | Download CSV export |
| `/admin/vendors/{id}/export/letzshop` | POST | Export to pickup folder | | `/admin/letzshop/vendors/{id}/export` | POST | Export to pickup folder |
### Jobs ### Jobs

View File

@@ -0,0 +1,253 @@
# Session Note: Module Dependency Redesign
**Date:** 2026-02-03
**Status:** To be continued
**Priority:** High - Architecture blocker
---
## Summary
We discovered that while billing, payments, and messaging have been correctly reclassified as core modules, **the app would still crash if optional modules are removed** due to hard imports from core → optional modules.
The fundamental issue is **flawed architecture**: core modules should never depend on optional modules. The dependency direction is backwards.
---
## Work Completed Today
### 1. Module Reclassification
Made billing, payments, and messaging core modules:
- `app/modules/billing/definition.py` - `is_core=True`
- `app/modules/payments/definition.py` - `is_core=True`
- `app/modules/messaging/definition.py` - `is_core=True`
### 2. Letzshop Export Routes Moved (T5b)
Moved export routes from tenancy to marketplace where they belong:
- **Old:** `GET/POST /api/v1/admin/vendors/{id}/export/letzshop`
- **New:** `GET/POST /api/v1/admin/letzshop/vendors/{id}/export`
Files changed:
- `app/modules/marketplace/routes/api/admin_letzshop.py`
- `app/modules/marketplace/schemas/letzshop.py`
- `app/modules/tenancy/routes/api/admin_vendors.py`
- `app/modules/tenancy/schemas/vendor.py`
- Tests and documentation updated
### 3. Documentation Updated
- `docs/architecture/module-system.md`
- `docs/proposals/decouple-modules.md`
- `docs/testing/admin-frontend-features.md`
- `docs/guides/letzshop-admin-management.md`
---
## Critical Issue: Core → Optional Dependencies
### Current State (BROKEN)
The app crashes on startup with `ImportError` if optional modules are removed.
#### tenancy (core) → analytics (optional)
```python
# tenancy/routes/api/admin_vendors.py (lines 20, 23)
from app.modules.analytics.services.stats_service import stats_service # TOP-LEVEL
from app.modules.analytics.schemas import VendorStatsResponse # TOP-LEVEL
```
#### tenancy (core) → marketplace (optional)
```python
# tenancy/models/__init__.py (line 22)
from app.modules.marketplace.models.marketplace_import_job import MarketplaceImportJob # TOP-LEVEL
# tenancy/services/vendor_service.py (lines 19, 26)
from app.modules.marketplace.exceptions import MarketplaceProductNotFoundException
from app.modules.marketplace.models import MarketplaceProduct
```
#### tenancy (core) → catalog (optional)
```python
# tenancy/services/vendor_service.py (lines 18, 27, 30)
from app.modules.catalog.exceptions import ProductAlreadyExistsException
from app.modules.catalog.models import Product
from app.modules.catalog.schemas import ProductCreate
```
#### billing (core) → catalog (optional)
```python
# billing/services/subscription_service.py (line 48)
from app.modules.catalog.models import Product
# billing/services/admin_subscription_service.py (line 30)
from app.modules.catalog.models import Product
# billing/services/capacity_forecast_service.py (line 19)
from app.modules.catalog.models import Product
```
---
## The Design Flaw
### Current (Wrong)
```
Core Modules ──imports──> Optional Modules
CRASH if optional removed
```
### Correct Design
```
Optional Modules ──extends/provides to──> Core Modules
Graceful degradation if optional removed
```
---
## Root Cause Analysis
| Violation | Why It's Wrong | What Should Happen |
|-----------|----------------|-------------------|
| tenancy imports `stats_service` | Core shouldn't know analytics exists | Analytics registers a MetricsProvider; core discovers it |
| tenancy imports `MarketplaceImportJob` | Core shouldn't know marketplace exists | Marketplace owns its relationships entirely |
| tenancy's vendor_service creates products | Product creation is catalog's domain | Move this code to catalog module |
| billing imports `Product` to count | Billing shouldn't query catalog tables | Catalog provides count via ProductCountProvider protocol |
---
## Proposed Solution: Provider Pattern
### Architecture
```
┌─────────────────────────────────────────────────────────┐
│ CORE MODULES │
│ (Define protocols/hooks, never import from optional) │
│ │
│ contracts: Define protocols (MetricsProvider, etc.) │
│ core: Discover and aggregate providers │
│ tenancy: Vendor management (no product knowledge) │
│ billing: Tier limits (ask "count?" via protocol) │
└─────────────────────────────────────────────────────────┘
│ implements/provides
┌─────────────────────────────────────────────────────────┐
│ OPTIONAL MODULES │
│ (Extend core by implementing protocols/hooks) │
│ │
│ catalog: Implements ProductCountProvider │
│ analytics: Implements MetricsProvider │
│ marketplace: Owns import jobs, provides to tenancy │
└─────────────────────────────────────────────────────────┘
```
### Example: Billing Needs Product Count
**Current (wrong):**
```python
# billing/services/subscription_service.py
from app.modules.catalog.models import Product # CRASH if catalog removed
def get_product_count(vendor_id):
return db.query(Product).filter(Product.vendor_id == vendor_id).count()
```
**Proposed (correct):**
```python
# contracts/capacity.py
class ProductCountProvider(Protocol):
def get_product_count(self, db: Session, vendor_id: int) -> int: ...
# catalog/services/product_count_provider.py
class CatalogProductCountProvider:
def get_product_count(self, db, vendor_id):
return db.query(Product).filter(...).count()
# Register in catalog/definition.py
catalog_module = ModuleDefinition(
product_count_provider=_get_product_count_provider,
...
)
# billing/services/subscription_service.py
def get_product_count(db, vendor_id, platform_id):
provider = get_product_count_provider(db, platform_id) # Discovers from enabled modules
if provider:
return provider.get_product_count(db, vendor_id)
return 0 # Graceful fallback
```
---
## Questions to Resolve Tomorrow
1. **What belongs where?**
- Does product creation in `tenancy/vendor_service.py` belong in catalog?
- Should `MarketplaceImportJob` relationships stay on User/Vendor or move entirely to marketplace?
2. **Provider patterns needed:**
- `MetricsProvider` (already proposed) - for dashboard stats
- `ProductCountProvider` - for billing tier limits
- `ImportJobProvider` - for marketplace import jobs?
3. **Migration strategy:**
- Move code first, or create protocols first?
- How to handle the User/Vendor ↔ MarketplaceImportJob relationship?
4. **Testing:**
- How to verify the app runs with optional modules removed?
- Integration test that imports only core modules?
---
## Module Classification Reference
### Core Modules (8)
| Module | Purpose |
|--------|---------|
| contracts | Protocol definitions |
| core | Dashboard, settings |
| tenancy | Platform, company, vendor, user management |
| cms | Content pages, media, themes |
| customers | Customer database |
| billing | Subscriptions, tier limits |
| payments | Payment gateways |
| messaging | Email, notifications |
### Optional Modules (8)
| Module | Purpose |
|--------|---------|
| analytics | Reports, dashboards |
| cart | Shopping cart |
| catalog | Product catalog |
| checkout | Order placement |
| inventory | Stock management |
| loyalty | Loyalty programs |
| marketplace | Letzshop integration |
| orders | Order management |
### Internal Modules (2)
| Module | Purpose |
|--------|---------|
| dev-tools | Component library |
| monitoring | Logs, tasks |
---
## Files to Review Tomorrow
1. `app/modules/tenancy/services/vendor_service.py` - Product creation code
2. `app/modules/tenancy/models/__init__.py` - MarketplaceImportJob import
3. `app/modules/billing/services/subscription_service.py` - Product count queries
4. `app/modules/contracts/` - Existing protocols to extend
---
## Next Steps
1. Design the provider protocols needed
2. Decide what code moves where
3. Implement the provider pattern for one case (e.g., ProductCountProvider)
4. Test that core modules can load without optional modules
5. Apply pattern to remaining violations

View File

@@ -0,0 +1,552 @@
# Implementation Plan: Decouple Core Modules from Optional Modules
## Executive Summary
This plan addresses the remaining architecture violations where core modules have hard dependencies on optional modules. The goal is to ensure the app can run even if optional modules are removed.
**Current Status:**
- ✅ Dashboard statistics (core → analytics) - FIXED via MetricsProvider pattern
- ❌ Tenancy → marketplace, catalog, billing, analytics - **6 violations**
- ❌ CMS → catalog, billing, messaging - **5 violations (1 misplaced service)**
- ❌ Customers → orders - **2 violations**
---
## Part 1: Tenancy Module Violations
### Violation T1: MarketplaceImportJob in models/__init__.py
**Current Code:**
```python
# tenancy/models/__init__.py:22
from app.modules.marketplace.models.marketplace_import_job import MarketplaceImportJob # noqa: F401
```
**Purpose:** SQLAlchemy relationship resolution for `User.marketplace_import_jobs` and `Vendor.marketplace_import_jobs`
**Solution: Remove relationships from core models**
The relationships `User.marketplace_import_jobs` and `Vendor.marketplace_import_jobs` should be defined ONLY in the MarketplaceImportJob model using `backref`, not on the User/Vendor models. This is a one-way relationship that optional modules add to core models.
**Implementation:**
1. Remove the import from `tenancy/models/__init__.py`
2. Remove `marketplace_import_jobs` relationship from User model
3. Remove `marketplace_import_jobs` relationship from Vendor model
4. Ensure MarketplaceImportJob already has the relationship defined with `backref`
5. Access pattern changes: `user.marketplace_import_jobs` → query `MarketplaceImportJob.filter(user_id=user.id)`
**Impact:** Low - This is internal data access, not a public API
---
### Violation T2: MarketplaceImportJob in admin_service.py
**Current Code:**
```python
# tenancy/services/admin_service.py:36,40
from app.modules.marketplace.models import MarketplaceImportJob
from app.modules.marketplace.schemas import MarketplaceImportJobResponse
```
**Used In:**
- `get_marketplace_import_jobs()` - Admin endpoint for listing import jobs
- `get_recent_import_jobs()` - Dashboard recent imports widget
- `_convert_job_to_response()` - Response conversion
**Solution: Move functionality to marketplace module**
These methods belong in the marketplace module, not tenancy. The admin dashboard should call marketplace service methods.
**Implementation:**
1. Create `app/modules/marketplace/services/import_job_service.py` (if not exists)
2. Move `get_marketplace_import_jobs()` logic to marketplace module
3. Move `get_recent_import_jobs()` logic to marketplace module
4. Update admin_service to use lazy imports with try/except:
```python
def get_recent_import_jobs(self, db: Session, limit: int = 10) -> list:
try:
from app.modules.marketplace.services import import_job_service
return import_job_service.get_recent_jobs(db, limit)
except ImportError:
return [] # Marketplace module not installed
```
5. Update admin dashboard route to handle empty list gracefully
**Impact:** Medium - Admin UI shows "No imports" if marketplace disabled
---
### Violation T3: Catalog/Marketplace in vendor_service.py
**Current Code:**
```python
# tenancy/services/vendor_service.py:18-30
from app.modules.catalog.exceptions import ProductAlreadyExistsException
from app.modules.marketplace.exceptions import MarketplaceProductNotFoundException
from app.modules.marketplace.models import MarketplaceProduct
from app.modules.catalog.models import Product
from app.modules.catalog.schemas import ProductCreate
```
**Used In:**
- `add_product_to_catalog()` - Adds marketplace product to vendor catalog
**Solution: Move product management to catalog module**
Product management is catalog functionality, not tenancy functionality. The `add_product_to_catalog()` method should live in the catalog module.
**Implementation:**
1. Create `app/modules/catalog/services/product_catalog_service.py`
2. Move `add_product_to_catalog()` to catalog module
3. Move helper methods `_get_product_by_id_or_raise()` and `_product_in_catalog()`
4. vendor_service delegates to catalog service with lazy import:
```python
def add_product_to_catalog(self, db: Session, vendor_id: int, product_data: dict):
try:
from app.modules.catalog.services import product_catalog_service
return product_catalog_service.add_product(db, vendor_id, product_data)
except ImportError:
raise ModuleNotEnabledException("catalog")
```
**Impact:** Medium - Feature requires both catalog and marketplace modules
---
### Violation T4: TierLimitExceededException in vendor_team_service.py
**Current Code:**
```python
# tenancy/services/vendor_team_service.py:34
from app.modules.billing.exceptions import TierLimitExceededException
# Line 78 (lazy import inside method)
from app.modules.billing.services import subscription_service
subscription_service.check_team_limit(db, vendor.id)
```
**Used In:**
- `invite_team_member()` - Validates team size against subscription tier
**Solution: Protocol-based limit checking**
Define a `TierLimitChecker` protocol in contracts module. Billing implements it, tenancy uses it optionally.
**Implementation:**
1. Add to `app/modules/contracts/billing.py`:
```python
@runtime_checkable
class TierLimitCheckerProtocol(Protocol):
def check_team_limit(self, db: Session, vendor_id: int) -> None:
"""Raises TierLimitExceededException if limit exceeded."""
...
```
2. Add generic exception to tenancy module:
```python
# tenancy/exceptions.py
class TeamSizeLimitExceededException(WizamartException):
"""Team size limit exceeded (billing module provides specific limits)."""
```
3. Update vendor_team_service.py:
```python
def invite_team_member(self, ...):
# Check tier limits if billing module available
try:
from app.modules.billing.services import subscription_service
subscription_service.check_team_limit(db, vendor.id)
except ImportError:
pass # No billing module - no tier limits
except Exception as e:
# Convert billing exception to tenancy exception
if "limit" in str(e).lower():
raise TeamSizeLimitExceededException(str(e))
raise
```
**Impact:** Low - Without billing, team size is unlimited
---
### Violation T5: Analytics/Marketplace in admin_vendors.py
**Current Code:**
```python
# tenancy/routes/api/admin_vendors.py:20,23
from app.modules.analytics.services.stats_service import stats_service
from app.modules.analytics.schemas import VendorStatsResponse
# Lines 348, 399 (lazy imports)
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
```
**Used In:**
- `get_vendor_statistics()` - Returns vendor counts
- `export_vendor_products_letzshop()` - CSV export
- `export_vendor_products_letzshop_to_folder()` - Batch export
**Solution A (Analytics):** Use MetricsProvider pattern (already implemented!)
The stats endpoint should use our new `StatsAggregatorService`:
```python
@router.get("/vendors/stats")
def get_vendor_statistics(db: Session = Depends(get_db), ...):
from app.modules.core.services.stats_aggregator import stats_aggregator
metrics = stats_aggregator.get_admin_dashboard_stats(db, platform_id)
# Extract tenancy metrics
tenancy_metrics = metrics.get("tenancy", [])
return _build_vendor_stats_response(tenancy_metrics)
```
**Solution B (Marketplace Export):** Already uses lazy imports - wrap in try/except
```python
@router.get("/vendors/{vendor_id}/export/letzshop")
def export_vendor_products_letzshop(...):
try:
from app.modules.marketplace.services.letzshop_export_service import letzshop_export_service
return letzshop_export_service.export_vendor_products(...)
except ImportError:
raise ModuleNotEnabledException("marketplace")
```
**Impact:** Low - Features gracefully degrade
---
### Violation T6: Analytics in admin_platform_users.py
**Current Code:**
```python
# tenancy/routes/api/admin_platform_users.py:18
from app.modules.analytics.services.stats_service import stats_service
```
**Used In:**
- `get_user_statistics()` - Returns user counts
**Solution:** Same as T5 - use StatsAggregator
```python
@router.get("/users/stats")
def get_user_statistics(db: Session = Depends(get_db), ...):
from app.modules.core.services.stats_aggregator import stats_aggregator
metrics = stats_aggregator.get_admin_dashboard_stats(db, platform_id)
tenancy_metrics = metrics.get("tenancy", [])
return _build_user_stats_response(tenancy_metrics)
```
**Impact:** None - Already have user metrics in tenancy_metrics
---
## Part 2: CMS Module Violations
### Violation C1: ProductMedia in media.py and media_service.py
**Current Code:**
```python
# cms/models/media.py:77-80
product_associations = relationship("ProductMedia", back_populates="media", ...)
# cms/services/media_service.py:31
from app.modules.catalog.models import ProductMedia
```
**Used In:**
- `attach_to_product()` - Creates product-media association
- `detach_from_product()` - Removes product-media association
- `get_media_usage()` - Lists where media is used
**Solution: Move product-media logic to catalog module**
The CMS media service shouldn't know about products. Product-media associations are catalog concerns.
**Implementation:**
1. Create `app/modules/catalog/services/product_media_service.py`
2. Move `attach_to_product()` and `detach_from_product()` to catalog
3. CMS media_service uses lazy delegation:
```python
def attach_to_product(self, db: Session, media_id: int, product_id: int):
try:
from app.modules.catalog.services import product_media_service
return product_media_service.attach_media(db, media_id, product_id)
except ImportError:
raise ModuleNotEnabledException("catalog")
```
4. For `get_media_usage()`, optionally include product associations:
```python
def get_media_usage(self, db: Session, media_id: int) -> dict:
usage = {"pages": [...], "products": []}
try:
from app.modules.catalog.services import product_media_service
usage["products"] = product_media_service.get_media_products(db, media_id)
except ImportError:
pass # No catalog module
return usage
```
**Impact:** Medium - Product-media features require catalog module
---
### Violation C2: TIER_LIMITS in platform.py (Homepage pricing)
**Current Code:**
```python
# cms/routes/pages/platform.py:17
from app.modules.billing.models import TIER_LIMITS, TierCode
```
**Used In:**
- `_get_tiers_data()` - Builds pricing tier display for homepage
- Called by `homepage()` and `content_page()` handlers
**Solution: Use Context Provider pattern**
Billing module should provide tier data via context provider (already supported in module architecture).
**Implementation:**
1. Add context provider to billing module definition:
```python
# billing/definition.py
def _get_platform_context(request, db, platform) -> dict:
from app.modules.billing.models import TIER_LIMITS, TierCode
tiers = [...] # Build tiers data
return {"tiers": tiers, "has_billing": True}
billing_module = ModuleDefinition(
context_providers={
FrontendType.PLATFORM: _get_platform_context,
},
)
```
2. Remove `_get_tiers_data()` from platform.py
3. Template checks `{% if tiers %}` before showing pricing section
4. Homepage gracefully shows "Contact us for pricing" if billing disabled
**Impact:** Low - Pricing section hidden if billing disabled
---
### Violation C3 & C4: MISPLACED SERVICE - vendor_email_settings_service.py
**Current Code:**
```python
# cms/services/vendor_email_settings_service.py:28-33
from app.modules.messaging.models import VendorEmailSettings, EmailProvider, PREMIUM_EMAIL_PROVIDERS
from app.modules.billing.models import VendorSubscription, TierCode
```
**Critical Finding:** This service is in the WRONG MODULE!
The `vendor_email_settings_service.py` is a **messaging** service that manages email provider configuration. It belongs in the messaging module, not CMS.
**Solution: Move service to messaging module**
**Implementation:**
1. Move `cms/services/vendor_email_settings_service.py` → `messaging/services/vendor_email_settings_service.py`
2. Update all imports that reference it (search codebase)
3. For billing tier checks, use lazy import with graceful fallback:
```python
# messaging/services/vendor_email_settings_service.py
def _check_premium_tier(self, db: Session, vendor_id: int, provider: str) -> bool:
if provider not in PREMIUM_EMAIL_PROVIDERS:
return True # Non-premium providers always allowed
try:
from app.modules.billing.services import subscription_service
tier = subscription_service.get_vendor_tier(db, vendor_id)
return tier in {TierCode.BUSINESS, TierCode.ENTERPRISE}
except ImportError:
return True # No billing module - all providers allowed
```
4. Update `cms/services/__init__.py` to remove the export
5. Add backwards-compatibility alias if needed (deprecation warning)
**Impact:** Medium - Requires import path updates
---
### Violation C5: Dead Code - Vendor import
**Current Code:**
```python
# cms/services/vendor_email_settings_service.py:27
from app.modules.tenancy.models import Vendor # UNUSED
```
**Solution:** Remove the unused import when moving the service.
---
## Part 3: Customers Module Violations
### Violation CU1 & CU2: Order imports in customer_service.py
**Current Code:**
```python
# customers/services/customer_service.py:332-350 (lazy import)
from app.modules.orders.models import Order # in get_customer_orders()
# customers/services/customer_service.py:365-396 (lazy import)
from app.modules.orders.models import Order # in get_customer_statistics()
```
**Used In:**
- `get_customer_orders()` - Lists orders for a customer
- `get_customer_statistics()` - Calculates customer LTV metrics
**Solution: Protocol-based customer order service**
Define a `CustomerOrdersProtocol` in contracts. Orders module implements it.
**Implementation:**
1. Add to `app/modules/contracts/orders.py`:
```python
@runtime_checkable
class CustomerOrdersProtocol(Protocol):
def get_customer_orders(
self, db: Session, vendor_id: int, customer_id: int, skip: int, limit: int
) -> tuple[list, int]:
...
def get_customer_statistics(
self, db: Session, vendor_id: int, customer_id: int
) -> dict:
...
```
2. Create `app/modules/orders/services/customer_orders_service.py`:
```python
class CustomerOrdersService:
def get_customer_orders(self, db, vendor_id, customer_id, skip, limit):
from app.modules.orders.models import Order
# ... existing logic
def get_customer_statistics(self, db, vendor_id, customer_id):
from app.modules.orders.models import Order
# ... existing logic
customer_orders_service = CustomerOrdersService()
```
3. Update customer_service.py to use lazy service discovery:
```python
def get_customer_orders(self, db, vendor_id, customer_id, skip=0, limit=50):
try:
from app.modules.orders.services import customer_orders_service
return customer_orders_service.get_customer_orders(db, vendor_id, customer_id, skip, limit)
except ImportError:
return [], 0 # No orders module
def get_customer_statistics(self, db, vendor_id, customer_id):
customer = self.get_customer(db, vendor_id, customer_id)
stats = {
"customer_id": customer_id,
"member_since": customer.created_at,
"is_active": customer.is_active,
"total_orders": 0,
"total_spent": 0.0,
"average_order_value": 0.0,
"last_order_date": None,
}
try:
from app.modules.orders.services import customer_orders_service
order_stats = customer_orders_service.get_customer_statistics(db, vendor_id, customer_id)
stats.update(order_stats)
except ImportError:
pass # No orders module - return base stats
return stats
```
**Impact:** Low - Customer details show "No orders" if orders disabled
---
## Implementation Phases
### Phase 1: Quick Wins (Low Risk)
1. T6: Update admin_platform_users.py to use StatsAggregator
2. T5a: Update admin_vendors.py stats endpoint to use StatsAggregator
3. C5: Remove dead Vendor import
4. T4: Add try/except for billing tier check
### Phase 2: Service Relocation (Medium Risk)
1. C3/C4: Move vendor_email_settings_service to messaging module
2. T2: Move import job methods to marketplace module
3. C1: Move product-media logic to catalog module
### Phase 3: Model Relationship Cleanup (Medium Risk)
1. T1: Remove MarketplaceImportJob relationship from User/Vendor models
2. T3: Move product catalog methods to catalog module
### Phase 4: Protocol-Based Decoupling (Low Risk)
1. CU1/CU2: Create customer orders service in orders module
2. C2: Add billing context provider for pricing tiers
### Phase 5: Testing & Verification
1. Run full test suite with all modules enabled
2. Test with each optional module disabled individually
3. Update architecture validation script
4. Update documentation
---
## Summary Table
| ID | Module | Violation | Solution | Risk | Phase |
|----|--------|-----------|----------|------|-------|
| T1 | tenancy | MarketplaceImportJob in models | Remove relationship from core | Medium | 3 |
| T2 | tenancy | ImportJob in admin_service | Move to marketplace | Medium | 2 |
| T3 | tenancy | Products in vendor_service | Move to catalog | Medium | 3 |
| T4 | tenancy | TierLimit in team_service | Try/except wrapper | Low | 1 |
| T5a | tenancy | Stats in admin_vendors | Use StatsAggregator | Low | 1 |
| T5b | tenancy | Export in admin_vendors | Already lazy - add try/except | Low | 1 |
| T6 | tenancy | Stats in admin_users | Use StatsAggregator | Low | 1 |
| C1 | cms | ProductMedia in media | Move to catalog | Medium | 2 |
| C2 | cms | TIER_LIMITS in platform | Context provider | Low | 4 |
| C3/C4 | cms | MISPLACED email service | Move to messaging | Medium | 2 |
| C5 | cms | Dead Vendor import | Remove | None | 1 |
| CU1/CU2 | customers | Order in customer_service | Protocol + lazy | Low | 4 |
---
## Verification Commands
After implementation, run these to verify:
```bash
# Check no core→optional imports remain
grep -r "from app.modules.analytics" app/modules/core/ app/modules/tenancy/ app/modules/cms/ app/modules/customers/ 2>/dev/null | grep -v "# optional" || echo "Clean!"
grep -r "from app.modules.marketplace" app/modules/core/ app/modules/tenancy/ app/modules/cms/ app/modules/customers/ 2>/dev/null | grep -v "# optional" || echo "Clean!"
grep -r "from app.modules.billing" app/modules/core/ app/modules/tenancy/ app/modules/cms/ app/modules/customers/ 2>/dev/null | grep -v "# optional" || echo "Clean!"
grep -r "from app.modules.catalog" app/modules/core/ app/modules/tenancy/ app/modules/cms/ app/modules/customers/ 2>/dev/null | grep -v "# optional" || echo "Clean!"
grep -r "from app.modules.orders" app/modules/core/ app/modules/tenancy/ app/modules/cms/ app/modules/customers/ 2>/dev/null | grep -v "# optional" || echo "Clean!"
# Run tests with each optional module "disabled" (move temporarily)
# This would be a more advanced test
# Run architecture validator
python scripts/validate_architecture.py
```
---
## Expected Outcome
After all phases complete:
- ✅ App starts successfully even if optional modules are removed
- ✅ Core modules have NO hard imports from optional modules
- ✅ Features gracefully degrade when optional modules disabled
- ✅ All cross-module communication via protocols or lazy imports with try/except

View File

@@ -0,0 +1,917 @@
Analytics Dependency Status: ❌ NOT FIXED ─
The MetricsProvider pattern exists in contracts/metrics.py, but admin_vendors.py still has hard imports:
File: app/modules/tenancy/routes/api/admin_vendors.py
┌──────┬────────────────────────────────────────────────────────────────────────┬─────────────────────────────────────────────┐
│ Line │ Import │ Used In │
├──────┼────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────┤
│ 20 │ from app.modules.analytics.services.stats_service import stats_service │ get_vendor_statistics_endpoint() (line 110) │
├──────┼────────────────────────────────────────────────────────────────────────┼─────────────────────────────────────────────┤
│ 23 │ from app.modules.analytics.schemas import VendorStatsResponse │ Return type (line 104) │
└──────┴────────────────────────────────────────────────────────────────────────┴─────────────────────────────────────────────┘
---
All Core → Optional Dependencies
1. tenancy → marketplace
File: app/modules/tenancy/models/__init__.py:22
from app.modules.marketplace.models.marketplace_import_job import MarketplaceImportJob
- Purpose: SQLAlchemy needs this to resolve User.marketplace_import_jobs and Vendor.marketplace_import_jobs relationships
File: app/modules/tenancy/services/vendor_service.py
┌──────┬─────────────────────────────────────┬───────────────────────────────────────────────────────────────────────┐
│ Line │ Import │ Functions Using It │
├──────┼─────────────────────────────────────┼───────────────────────────────────────────────────────────────────────┤
│ 19 │ MarketplaceProductNotFoundException │ add_product_to_catalog() :463, _get_product_by_id_or_raise() :571 │
├──────┼─────────────────────────────────────┼───────────────────────────────────────────────────────────────────────┤
│ 26 │ MarketplaceProduct model │ _get_product_by_id_or_raise() :565-566, add_product_to_catalog() :466 │
└──────┴─────────────────────────────────────┴───────────────────────────────────────────────────────────────────────┘
---
2. tenancy → catalog
File: app/modules/tenancy/services/vendor_service.py
┌──────┬───────────────────────────────┬────────────────────────────────────────────────────────────────────────────────┐
│ Line │ Import │ Functions Using It │
├──────┼───────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ 18 │ ProductAlreadyExistsException │ add_product_to_catalog() :472 │
├──────┼───────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ 27 │ Product model │ add_product_to_catalog() :477, get_products() :533, _product_in_catalog() :579 │
├──────┼───────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤
│ 30 │ ProductCreate schema │ add_product_to_catalog() parameter type :447 │
└──────┴───────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘
---
3. billing → catalog
File: app/modules/billing/services/subscription_service.py:48
┌───────────────────────┬─────────┬─────────────────────────────────────────────────────────────────────────┐
│ Function │ Lines │ What It Does │
├───────────────────────┼─────────┼─────────────────────────────────────────────────────────────────────────┤
│ get_usage() │ 371-376 │ db.query(func.count(Product.id)).filter(Product.vendor_id == vendor_id) │
├───────────────────────┼─────────┼─────────────────────────────────────────────────────────────────────────┤
│ get_usage_summary() │ 420-425 │ Same product count query │
├───────────────────────┼─────────┼─────────────────────────────────────────────────────────────────────────┤
│ can_add_product() │ 514-519 │ Same product count query │
├───────────────────────┼─────────┼─────────────────────────────────────────────────────────────────────────┤
│ check_product_limit() │ 532-537 │ Same product count query │
└───────────────────────┴─────────┴─────────────────────────────────────────────────────────────────────────┘
File: app/modules/billing/services/admin_subscription_service.py:30
┌───────────────────────────┬─────────┬─────────────────────────────────────────────────────────────────────────┐
│ Function │ Lines │ What It Does │
├───────────────────────────┼─────────┼─────────────────────────────────────────────────────────────────────────┤
│ get_vendor_usage_counts() │ 212-217 │ db.query(func.count(Product.id)).filter(Product.vendor_id == vendor_id) │
└───────────────────────────┴─────────┴─────────────────────────────────────────────────────────────────────────┘
File: app/modules/billing/services/capacity_forecast_service.py:19
┌──────────────────────────┬──────┬───────────────────────────────────────────┐
│ Function │ Line │ What It Does │
├──────────────────────────┼──────┼───────────────────────────────────────────┤
│ capture_daily_snapshot() │ 91 │ db.query(func.count(Product.id)).scalar() │
└──────────────────────────┴──────┴───────────────────────────────────────────┘
---
Summary: What to Move Where?
┌──────────────────────────────────────────┬───────────────────────────────────────────────────────────────────────────────────────────────────┐
│ Dependency │ Recommendation │
├──────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ admin_vendors.py → analytics │ Use the existing MetricsProvider pattern via stats_aggregator │
├──────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ vendor_service.py → marketplace/catalog │ Move add_product_to_catalog() and get_products() to catalog module - these are product operations │
├──────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ tenancy/models/__init__.py → marketplace │ Remove MarketplaceImportJob import, move relationships to marketplace module │
├──────────────────────────────────────────┼───────────────────────────────────────────────────────────────────────────────────────────────────┤
│ billing/* → catalog (Product count) │ Create ProductCountProvider protocol in contracts │
└──────────────────────────────────────────┴───────────────────────────────────────────────────────────────────────────────────────────────────┘
Would you like me to start working on any of these?
--------------------
Modular Architecture Analysis
Executive Summary
This analysis answers two questions:
1. Are all modules self-contained and respecting architecture rules?
2. Would the app still run if non-core modules are removed?
Key Findings:
- ✅ Route registration is fully auto-discovered (no hardcoded module routes)
- ✅ Middleware has no module-specific dependencies
- ✅ API routers use dynamic discovery pattern
- ❌ CRITICAL: Core modules have hard dependencies on optional modules - the app would CRASH if optional modules are removed
---
Part 1: Architecture Rule Compliance
What's Working Well
┌─────────────────────────┬────────┬────────────────────────────────────────────────┐
│ Aspect │ Status │ Details │
├─────────────────────────┼────────┼────────────────────────────────────────────────┤
│ Module auto-discovery │ ✅ │ All modules discovered via definition.py │
├─────────────────────────┼────────┼────────────────────────────────────────────────┤
│ Route auto-registration │ ✅ │ Admin, vendor, storefront routes fully dynamic │
├─────────────────────────┼────────┼────────────────────────────────────────────────┤
│ Middleware stack │ ✅ │ No hardcoded module dependencies │
├─────────────────────────┼────────┼────────────────────────────────────────────────┤
│ Static file mounting │ ✅ │ Dynamic scanning of module directories │
├─────────────────────────┼────────┼────────────────────────────────────────────────┤
│ Module enablement │ ✅ │ Controlled via PlatformModule.is_enabled │
└─────────────────────────┴────────┴────────────────────────────────────────────────┘
Previous Issues Fixed (This Session)
┌───────────────────────────────────┬──────────┬─────────────────────────────────────────────────────────────┐
│ Issue │ Status │ Fix Applied │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ Module definition variable names │ ✅ Fixed │ catalog, checkout, cart renamed to *_module │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ Router attachment functions │ ✅ Fixed │ Added get_*_module_with_routers() functions │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ Billing exceptions location │ ✅ Fixed │ Moved to exceptions.py with backwards-compat aliases │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ VendorEmailSettingsService DI │ ✅ Fixed │ Changed to db-as-parameter pattern │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ VendorDomainService exception bug │ ✅ Fixed │ Added proper re-raise for DomainVerificationFailedException │
├───────────────────────────────────┼──────────┼─────────────────────────────────────────────────────────────┤
│ VendorTeamService exception bug │ ✅ Fixed │ Fixed CannotRemoveOwnerException arguments │
└───────────────────────────────────┴──────────┴─────────────────────────────────────────────────────────────┘
---
Part 2: Critical Architecture Violations
❌ Core → Optional Module Dependencies
The app would NOT run if optional modules are removed. Core modules have hard imports from optional modules:
tenancy (core) → optional modules
tenancy/models/__init__.py:22
→ from app.modules.marketplace.models import MarketplaceImportJob
tenancy/services/admin_service.py:36,40
→ from app.modules.marketplace.models import MarketplaceImportJob
→ from app.modules.marketplace.schemas import MarketplaceImportJobResponse
tenancy/services/vendor_service.py:18,19,26,27,30
→ from app.modules.catalog.exceptions import ProductAlreadyExistsException
→ from app.modules.marketplace.exceptions import MarketplaceProductNotFoundException
→ from app.modules.marketplace.models import MarketplaceProduct
→ from app.modules.catalog.models import Product
→ from app.modules.catalog.schemas import ProductCreate
tenancy/services/vendor_team_service.py:34
→ from app.modules.billing.exceptions import TierLimitExceededException
tenancy/routes/api/admin_vendors.py:20,23,348,399
→ from app.modules.analytics.services.stats_service import stats_service
→ from app.modules.analytics.schemas import VendorStatsResponse
→ from app.modules.marketplace.services import letzshop_export_service
tenancy/routes/api/admin_platform_users.py:18
→ from app.modules.analytics.services.stats_service import stats_service
core (core) → analytics (optional)
core/routes/api/admin_dashboard.py:14,16
→ from app.modules.analytics.services.stats_service import stats_service
→ from app.modules.analytics.schemas import VendorStatsResponse, ...
core/routes/api/vendor_dashboard.py:17,20
→ from app.modules.analytics.services.stats_service import stats_service
→ from app.modules.analytics.schemas import VendorStatsResponse, ...
cms (core) → optional modules
cms/models/__init__.py:14, cms/models/media.py:9, cms/services/media_service.py:31
→ from app.modules.catalog.models import ProductMedia
cms/routes/pages/platform.py:17
→ from app.modules.billing.models import TIER_LIMITS, TierCode
cms/services/vendor_email_settings_service.py:33
→ from app.modules.billing.models import VendorSubscription, TierCode
customers (core) → orders (optional)
customers/services/customer_service.py:332,368
→ from app.modules.orders.models import Order # (lazy import but still hard dep)
---
Part 3: Module Categorization (UPDATED 2026-02-03)
Core Modules (8) - Always enabled
┌───────────┬───────────────────────────────────────────────────┬──────────────────────────────┐
│ Module │ Purpose │ Depends on Optional? │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ contracts │ Cross-module Protocol interfaces │ No │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ core │ Dashboard, settings, profile │ ❌ YES (analytics) │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ cms │ Content pages, media, themes │ ❌ YES (catalog) │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ customers │ Customer database, profiles │ ❌ YES (orders) │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ tenancy │ Platforms, companies, vendors, users │ ❌ YES (marketplace,catalog, │
│ │ │ analytics) │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ billing │ Subscriptions, tier limits, invoices │ No (depends on payments, │
│ │ │ which is also core) │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ payments │ Payment gateways (Stripe, PayPal) │ No │
├───────────┼───────────────────────────────────────────────────┼──────────────────────────────┤
│ messaging │ Email, notifications, templates │ No │
└───────────┴───────────────────────────────────────────────────┴──────────────────────────────┘
Why billing, payments, messaging are core (decided 2026-02-03):
- billing: Tier limits affect team size, product limits, email providers - pervasive
- payments: Required by billing for subscription payment processing
- messaging: Email required for registration, password reset, team invitations
Optional Modules (8) - Can be disabled per platform
┌─────────────┬────────────────────────────┬───────────────────────────────────┐
│ Module │ Purpose │ Dependencies │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ analytics │ Reports, dashboards │ (standalone) │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ cart │ Shopping cart │ inventory │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ catalog │ Product catalog │ inventory │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ checkout │ Order placement │ cart, orders, customers │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ inventory │ Stock management │ (standalone) │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ loyalty │ Loyalty programs │ customers │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ marketplace │ Letzshop integration │ inventory │
├─────────────┼────────────────────────────┼───────────────────────────────────┤
│ orders │ Order management │ (standalone) │
└─────────────┴────────────────────────────┴───────────────────────────────────┘
Internal Modules (2) - Admin-only
┌────────────┬─────────────────────────────────┐
│ Module │ Purpose │
├────────────┼─────────────────────────────────┤
│ dev-tools │ Component library, icon browser │
├────────────┼─────────────────────────────────┤
│ monitoring │ Logs, tasks, health checks │
└────────────┴─────────────────────────────────┘
---
Part 4: Answer to User Questions
Q1: Are all modules self-contained and respecting architecture rules?
NO. While the route registration and middleware are properly decoupled, the core modules have hard import dependencies on optional modules. This violates the documented rule that "core
modules cannot depend on optional modules."
Specific violations:
- tenancy depends on: marketplace, billing, catalog, analytics
- core depends on: analytics
- cms depends on: catalog, billing
- customers depends on: orders
Q2: Would the app still run if non-core modules are removed?
NO. The app would crash immediately on import with ImportError or ModuleNotFoundError because:
1. tenancy/models/__init__.py imports MarketplaceImportJob at module load time
2. core/routes/api/admin_dashboard.py imports stats_service at module load time
3. cms/models/__init__.py imports ProductMedia at module load time
Even if using lazy imports (like in customers/services/customer_service.py), the code paths that use these imports would still fail at runtime.
---
Part 5: Contracts Pattern Deep Dive (Option A)
How the Existing Contracts Module Works
The app/modules/contracts/ module already implements Protocol-based interfaces:
app/modules/contracts/
├── __init__.py # Exports all protocols
├── base.py # ServiceProtocol, CRUDServiceProtocol
├── cms.py # ContentServiceProtocol, MediaServiceProtocol
└── definition.py # Module definition (is_core=True)
Key Pattern:
# 1. Protocol definition in contracts (no implementation)
@runtime_checkable
class ContentServiceProtocol(Protocol):
def get_page_for_vendor(self, db: Session, ...) -> object | None: ...
# 2. Implementation in the module itself
class ContentPageService: # Implements the protocol implicitly (duck typing)
def get_page_for_vendor(self, db: Session, ...) -> ContentPage | None:
# actual implementation
# 3. Usage in other modules (depends on protocol, not implementation)
class OrderService:
def __init__(self, content: ContentServiceProtocol | None = None):
self._content = content
@property
def content(self) -> ContentServiceProtocol:
if self._content is None:
from app.modules.cms.services import content_page_service # Lazy load
self._content = content_page_service
return self._content
---
Part 6: Metrics Provider Pattern (Proposed)
Design Goal
Each module provides its own metrics/stats. The analytics module aggregates them but doesn't implement them.
Step 1: Define MetricsProvider Protocol
File: app/modules/contracts/metrics.py
from typing import Protocol, runtime_checkable, TYPE_CHECKING
from dataclasses import dataclass
if TYPE_CHECKING:
from sqlalchemy.orm import Session
@dataclass
class MetricValue:
"""Standard metric value with metadata."""
key: str # e.g., "orders.total_count"
value: int | float | str
label: str # Human-readable label
category: str # Grouping category
icon: str | None = None # Optional UI icon
@runtime_checkable
class MetricsProviderProtocol(Protocol):
"""
Protocol for modules that provide metrics/statistics.
Each module implements this to expose its own metrics.
The analytics module discovers and aggregates all providers.
"""
@property
def metrics_category(self) -> str:
"""Category name for this provider's metrics (e.g., 'orders', 'inventory')."""
...
def get_vendor_metrics(
self,
db: "Session",
vendor_id: int,
date_from: "datetime | None" = None,
date_to: "datetime | None" = None,
) -> list[MetricValue]:
"""Get metrics for a specific vendor."""
...
def get_platform_metrics(
self,
db: "Session",
platform_id: int,
date_from: "datetime | None" = None,
date_to: "datetime | None" = None,
) -> list[MetricValue]:
"""Get metrics aggregated for a platform."""
...
Step 2: Each Module Implements the Protocol
File: app/modules/orders/services/order_metrics.py
from app.modules.contracts.metrics import MetricsProviderProtocol, MetricValue
from sqlalchemy.orm import Session
class OrderMetricsProvider:
"""Metrics provider for orders module."""
@property
def metrics_category(self) -> str:
return "orders"
def get_vendor_metrics(
self, db: Session, vendor_id: int, date_from=None, date_to=None
) -> list[MetricValue]:
from app.modules.orders.models import Order
query = db.query(Order).filter(Order.vendor_id == vendor_id)
if date_from:
query = query.filter(Order.created_at >= date_from)
if date_to:
query = query.filter(Order.created_at <= date_to)
total_orders = query.count()
total_revenue = query.with_entities(func.sum(Order.total)).scalar() or 0
return [
MetricValue(
key="orders.total_count",
value=total_orders,
label="Total Orders",
category="orders",
icon="shopping-cart"
),
MetricValue(
key="orders.total_revenue",
value=float(total_revenue),
label="Total Revenue",
category="orders",
icon="currency-euro"
),
]
def get_platform_metrics(self, db: Session, platform_id: int, **kwargs):
# Aggregate across all vendors in platform
...
# Singleton instance
order_metrics_provider = OrderMetricsProvider()
Step 3: Register Provider in Module Definition
File: app/modules/orders/definition.py
from app.modules.base import ModuleDefinition
def _get_metrics_provider():
"""Lazy load metrics provider to avoid circular imports."""
from app.modules.orders.services.order_metrics import order_metrics_provider
return order_metrics_provider
orders_module = ModuleDefinition(
code="orders",
name="Order Management",
# ... existing config ...
metrics_provider=_get_metrics_provider, # NEW: Optional callable
)
Step 4: Analytics Module Discovers All Providers
File: app/modules/analytics/services/stats_aggregator.py
from app.modules.contracts.metrics import MetricsProviderProtocol, MetricValue
from app.modules.registry import MODULES
from sqlalchemy.orm import Session
class StatsAggregatorService:
"""Aggregates metrics from all module providers."""
def _get_enabled_providers(self, db: Session, platform_id: int) -> list[MetricsProviderProtocol]:
"""Get metrics providers from enabled modules."""
from app.modules.service import module_service
providers = []
for module in MODULES.values():
# Check if module is enabled for this platform
if not module_service.is_module_enabled(db, platform_id, module.code):
continue
# Check if module has a metrics provider
if hasattr(module, 'metrics_provider') and module.metrics_provider:
provider = module.metrics_provider() # Call the lazy getter
if provider:
providers.append(provider)
return providers
def get_vendor_stats(
self, db: Session, vendor_id: int, platform_id: int, **kwargs
) -> dict[str, list[MetricValue]]:
"""Get all metrics for a vendor, grouped by category."""
providers = self._get_enabled_providers(db, platform_id)
result = {}
for provider in providers:
metrics = provider.get_vendor_metrics(db, vendor_id, **kwargs)
result[provider.metrics_category] = metrics
return result
stats_aggregator = StatsAggregatorService()
Step 5: Dashboard Uses Protocol, Not Implementation
File: app/modules/core/routes/api/vendor_dashboard.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from app.core.database import get_db
from app.api.deps import get_current_vendor_api
from models.schema.auth import UserContext
router = APIRouter(prefix="/dashboard")
@router.get("/stats")
def get_dashboard_stats(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
"""Get aggregated stats for vendor dashboard."""
# Lazy import - only fails if analytics module removed AND this endpoint called
from app.modules.analytics.services.stats_aggregator import stats_aggregator
return stats_aggregator.get_vendor_stats(
db=db,
vendor_id=current_user.token_vendor_id,
platform_id=current_user.platform_id,
)
---
Part 7: Creating a New Module with Metrics
Checklist for New Module with Metrics Support
1. Create module structure:
app/modules/my_module/
├── __init__.py
├── definition.py # Include metrics_provider
├── models/
├── services/
│ ├── __init__.py
│ └── my_metrics.py # Implement MetricsProviderProtocol
├── routes/
└── schemas/
2. Implement metrics provider:
# app/modules/my_module/services/my_metrics.py
from app.modules.contracts.metrics import MetricsProviderProtocol, MetricValue
class MyModuleMetricsProvider:
@property
def metrics_category(self) -> str:
return "my_module"
def get_vendor_metrics(self, db, vendor_id, **kwargs):
return [
MetricValue(key="my_module.count", value=42, label="My Count", category="my_module")
]
def get_platform_metrics(self, db, platform_id, **kwargs):
return []
my_module_metrics = MyModuleMetricsProvider()
3. Register in definition:
# app/modules/my_module/definition.py
def _get_metrics_provider():
from app.modules.my_module.services.my_metrics import my_module_metrics
return my_module_metrics
my_module = ModuleDefinition(
code="my_module",
metrics_provider=_get_metrics_provider,
# ...
)
4. Metrics appear automatically in dashboards when module is enabled
---
Part 8: Benefits of This Architecture
┌───────────────────────────┬───────────────────────┬──────────────────────────────────┐
│ Aspect │ Before │ After │
├───────────────────────────┼───────────────────────┼──────────────────────────────────┤
│ Core depends on analytics │ ❌ Hard import │ ✅ Optional lazy import │
├───────────────────────────┼───────────────────────┼──────────────────────────────────┤
│ Adding new metrics │ Edit analytics module │ Just add provider to your module │
├───────────────────────────┼───────────────────────┼──────────────────────────────────┤
│ Module isolation │ ❌ Coupled │ ✅ Truly independent │
├───────────────────────────┼───────────────────────┼──────────────────────────────────┤
│ Testing │ Hard (need analytics) │ Easy (mock protocol) │
├───────────────────────────┼───────────────────────┼──────────────────────────────────┤
│ Disable analytics module │ ❌ App crashes │ ✅ Dashboard shows "no stats" │
└───────────────────────────┴───────────────────────┴──────────────────────────────────┘
---
Part 9: Core Module Metrics & Dashboard Architecture
Key Questions Answered
Q1: How do core modules implement MetricsProviderProtocol?
Core modules implement MetricsProviderProtocol the same way as optional modules. The difference is they're always enabled.
tenancy module metrics:
# app/modules/tenancy/services/tenancy_metrics.py
class TenancyMetricsProvider:
@property
def metrics_category(self) -> str:
return "tenancy"
def get_vendor_metrics(self, db, vendor_id, **kwargs):
# Vendor-specific: team members count, domains count
from app.modules.tenancy.models import VendorUser, VendorDomain
team_count = db.query(VendorUser).filter(
VendorUser.vendor_id == vendor_id, VendorUser.is_active == True
).count()
domains_count = db.query(VendorDomain).filter(
VendorDomain.vendor_id == vendor_id
).count()
return [
MetricValue(key="tenancy.team_members", value=team_count,
label="Team Members", category="tenancy", icon="users"),
MetricValue(key="tenancy.domains", value=domains_count,
label="Custom Domains", category="tenancy", icon="globe"),
]
def get_platform_metrics(self, db, platform_id, **kwargs):
# Platform-wide: total vendors, total users, active vendors
from app.modules.tenancy.models import Vendor, User
total_vendors = db.query(Vendor).filter(
Vendor.platform_id == platform_id
).count()
active_vendors = db.query(Vendor).filter(
Vendor.platform_id == platform_id, Vendor.is_active == True
).count()
return [
MetricValue(key="tenancy.total_vendors", value=total_vendors,
label="Total Vendors", category="tenancy", icon="store"),
MetricValue(key="tenancy.active_vendors", value=active_vendors,
label="Active Vendors", category="tenancy", icon="check-circle"),
]
customers module metrics:
# app/modules/customers/services/customer_metrics.py
class CustomerMetricsProvider:
@property
def metrics_category(self) -> str:
return "customers"
def get_vendor_metrics(self, db, vendor_id, date_from=None, date_to=None):
from app.modules.customers.models import Customer
query = db.query(Customer).filter(Customer.vendor_id == vendor_id)
total = query.count()
# New customers in period
if date_from:
new_query = query.filter(Customer.created_at >= date_from)
if date_to:
new_query = new_query.filter(Customer.created_at <= date_to)
new_count = new_query.count()
else:
new_count = 0
return [
MetricValue(key="customers.total", value=total,
label="Total Customers", category="customers", icon="users"),
MetricValue(key="customers.new", value=new_count,
label="New Customers", category="customers", icon="user-plus"),
]
def get_platform_metrics(self, db, platform_id, **kwargs):
# Platform admin sees aggregated customer stats
...
Q2: Should analytics be part of core?
Recommendation: Split analytics into two parts:
┌────────────────────────┬─────────────────────────────┬─────────────────────────────────────────────────────┐
│ Component │ Location │ Purpose │
├────────────────────────┼─────────────────────────────┼─────────────────────────────────────────────────────┤
│ StatsAggregatorService │ core module │ Basic metric discovery & aggregation for dashboards │
├────────────────────────┼─────────────────────────────┼─────────────────────────────────────────────────────┤
│ Advanced Analytics │ analytics module (optional) │ Reports, charts, trends, exports, historical data │
└────────────────────────┴─────────────────────────────┴─────────────────────────────────────────────────────┘
Why this split works:
- Dashboards always work (aggregator in core)
- Advanced analytics features can be disabled per platform
- No circular dependencies
- Clear separation of concerns
Implementation:
app/modules/core/services/
├── stats_aggregator.py # Discovers & aggregates MetricsProviders (CORE)
└── ...
app/modules/analytics/services/
├── reports_service.py # Advanced reports (OPTIONAL)
├── trends_service.py # Historical trends (OPTIONAL)
└── export_service.py # Data exports (OPTIONAL)
StatsAggregatorService in core:
# app/modules/core/services/stats_aggregator.py
from app.modules.contracts.metrics import MetricsProviderProtocol, MetricValue
from app.modules.registry import MODULES
class StatsAggregatorService:
"""
Core service that discovers and aggregates metrics from all modules.
Lives in core module so dashboards always work.
"""
def _get_providers(self, db, platform_id) -> list[MetricsProviderProtocol]:
"""Get all metrics providers from enabled modules."""
from app.modules.service import module_service
providers = []
for module in MODULES.values():
# Always include core modules, check others
if not module.is_core:
if not module_service.is_module_enabled(db, platform_id, module.code):
continue
if hasattr(module, 'metrics_provider') and module.metrics_provider:
provider = module.metrics_provider()
if provider:
providers.append(provider)
return providers
def get_vendor_dashboard_stats(self, db, vendor_id, platform_id, **kwargs):
"""For vendor dashboard - single vendor metrics."""
providers = self._get_providers(db, platform_id)
return {p.metrics_category: p.get_vendor_metrics(db, vendor_id, **kwargs)
for p in providers}
def get_admin_dashboard_stats(self, db, platform_id, **kwargs):
"""For admin dashboard - platform-wide metrics."""
providers = self._get_providers(db, platform_id)
return {p.metrics_category: p.get_platform_metrics(db, platform_id, **kwargs)
for p in providers}
stats_aggregator = StatsAggregatorService()
Q3: Should this be used by both admin and vendor dashboards?
YES. The protocol has two methods for this exact purpose:
┌───────────────────────────────────┬──────────────────┬───────────────────────────────┐
│ Method │ Used By │ Data Scope │
├───────────────────────────────────┼──────────────────┼───────────────────────────────┤
│ get_vendor_metrics(vendor_id) │ Vendor Dashboard │ Single vendor's data │
├───────────────────────────────────┼──────────────────┼───────────────────────────────┤
│ get_platform_metrics(platform_id) │ Admin Dashboard │ Aggregated across all vendors │
└───────────────────────────────────┴──────────────────┴───────────────────────────────┘
Vendor Dashboard:
# app/modules/core/routes/api/vendor_dashboard.py
@router.get("/stats")
def get_vendor_stats(
current_user: UserContext = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
):
from app.modules.core.services.stats_aggregator import stats_aggregator
return stats_aggregator.get_vendor_dashboard_stats(
db=db,
vendor_id=current_user.token_vendor_id,
platform_id=current_user.platform_id,
)
Admin Dashboard:
# app/modules/core/routes/api/admin_dashboard.py
@router.get("/stats")
def get_platform_stats(
current_user: UserContext = Depends(get_current_platform_admin_api),
db: Session = Depends(get_db),
):
from app.modules.core.services.stats_aggregator import stats_aggregator
return stats_aggregator.get_admin_dashboard_stats(
db=db,
platform_id=current_user.platform_id,
)
Module Categorization Update (UPDATED 2026-02-03)
With this architecture, module categorization becomes:
┌───────────┬──────────┬─────────────────┬───────────────────────────────────────┐
│ Module │ Type │ Has Metrics? │ Metrics Examples │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ core │ Core │ ✅ + Aggregator │ System stats, aggregator service │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ tenancy │ Core │ ✅ │ Vendors, users, team members, domains │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ customers │ Core │ ✅ │ Customer counts, new customers │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ cms │ Core │ ✅ │ Pages, media items │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ contracts │ Core │ ❌ │ Just protocols, no data │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ billing │ Core │ ✅ │ Subscription stats, tier usage │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ payments │ Core │ ✅ │ Transaction stats │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ messaging │ Core │ ✅ │ Email stats, notification counts │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ orders │ Optional │ ✅ │ Order counts, revenue │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ inventory │ Optional │ ✅ │ Stock levels, low stock │
├───────────┼──────────┼─────────────────┼───────────────────────────────────────┤
│ analytics │ Optional │ ❌ │ Advanced reports (uses aggregator) │
└───────────┴──────────┴─────────────────┴───────────────────────────────────────┘
---
Part 10: Implementation Steps
Phase 1: Create Metrics Contract (contracts module)
1. Create app/modules/contracts/metrics.py with:
- MetricValue dataclass
- MetricsProviderProtocol
2. Add metrics_provider: Callable | None field to ModuleDefinition base class
3. Export from contracts/__init__.py
Phase 2: Create Stats Aggregator (core module)
1. Create app/modules/core/services/stats_aggregator.py
- StatsAggregatorService that discovers all metrics providers
- get_vendor_dashboard_stats() method
- get_admin_dashboard_stats() method
2. Register in core module exports
Phase 3: Add Metrics Providers to Core Modules
1. tenancy → tenancy_metrics.py
- Vendor count, user count, team members, domains
2. customers → customer_metrics.py
- Customer count, new customers, active customers
3. cms → cms_metrics.py
- Page count, media count
Phase 4: Add Metrics Providers to Optional Modules
Priority order:
1. orders → order_metrics.py - order counts, revenue
2. inventory → inventory_metrics.py - stock levels, low stock alerts
3. billing → billing_metrics.py - subscription stats
4. marketplace → marketplace_metrics.py - import stats
Phase 5: Update Dashboard Routes
1. Update core/routes/api/vendor_dashboard.py to use aggregator
2. Update core/routes/api/admin_dashboard.py to use aggregator
3. Remove direct imports from analytics module
4. Handle graceful degradation when no metrics available
Phase 6: Refactor Analytics Module (Optional Features)
1. Keep advanced features in analytics module:
- reports_service.py - detailed reports
- trends_service.py - historical trends
- export_service.py - data exports
2. Analytics module can use stats_aggregator for base data
3. Remove duplicated metric calculations from analytics
Phase 7: Update Tests
1. Add unit tests for StatsAggregatorService
2. Add unit tests for each metrics provider
3. Update dashboard API tests
4. Test with analytics module disabled
---
Verification Commands
# Check module discovery
python -c "from app.modules.registry import MODULES; print(list(MODULES.keys()))"
# Test metrics provider discovery
python -c "
from app.modules.registry import MODULES
for m in MODULES.values():
if hasattr(m, 'metrics_provider') and m.metrics_provider:
print(f'{m.code}: has metrics provider')
"
# Test stats aggregator (after implementation)
python -c "
from app.modules.core.services.stats_aggregator import stats_aggregator
from app.core.database import SessionLocal
db = SessionLocal()
try:
# Test with platform_id=1
stats = stats_aggregator.get_admin_dashboard_stats(db, platform_id=1)
print('Categories:', list(stats.keys()))
finally:
db.close()
"
# Run metrics provider tests
pytest tests/unit/services/test_*_metrics.py -v
# Verify no core→optional imports remain
grep -r "from app.modules.analytics" app/modules/core/ app/modules/tenancy/ || echo "Clean!"
---
Summary: What Changes
Before (Current State)
core/routes/admin_dashboard.py
└── imports stats_service from analytics module ❌
└── analytics calculates ALL metrics internally
After (New Architecture)
core/routes/admin_dashboard.py
└── imports stats_aggregator from core module ✅
└── discovers MetricsProviders from ALL modules
├── tenancy_metrics (core)
├── customer_metrics (core)
├── order_metrics (optional, if enabled)
├── inventory_metrics (optional, if enabled)
└── ...
analytics module (optional)
└── Advanced features: reports, trends, exports
└── can use stats_aggregator for base data
Key Benefits
1. Dashboards always work - aggregator is in core
2. Each module owns its metrics - no cross-module coupling
3. Optional modules truly optional - can be removed without breaking app
4. Easy to add new metrics - just implement protocol in your module
5. Both dashboards supported - vendor (per-vendor) and admin (platform-wide)

View File

@@ -112,8 +112,8 @@
| Verify vendor | `/api/v1/admin/vendors/{vendor_identifier}/verification` | PUT | Working | | Verify vendor | `/api/v1/admin/vendors/{vendor_identifier}/verification` | PUT | Working |
| Toggle vendor status | `/api/v1/admin/vendors/{vendor_identifier}/status` | PUT | Working | | Toggle vendor status | `/api/v1/admin/vendors/{vendor_identifier}/status` | PUT | Working |
| Delete vendor (confirm) | `/api/v1/admin/vendors/{vendor_identifier}?confirm=true` | DELETE | Working | | Delete vendor (confirm) | `/api/v1/admin/vendors/{vendor_identifier}?confirm=true` | DELETE | Working |
| Export Letzshop CSV (download) | `/api/v1/admin/vendors/{vendor_identifier}/export/letzshop` | GET | Working | | Export Letzshop CSV (download) | `/api/v1/admin/letzshop/vendors/{vendor_id}/export` | GET | Working |
| Export Letzshop CSV (to folder) | `/api/v1/admin/vendors/{vendor_identifier}/export/letzshop` | POST | Working | | Export Letzshop CSV (to folder) | `/api/v1/admin/letzshop/vendors/{vendor_id}/export` | POST | Working |
**Create Vendor Form Fields:** **Create Vendor Form Fields:**
- [ ] name (required) - [ ] name (required)

View File

@@ -370,7 +370,7 @@ class TestAdminLetzshopExportAPI:
def test_export_vendor_products_empty(self, client, admin_headers, test_vendor): def test_export_vendor_products_empty(self, client, admin_headers, test_vendor):
"""Test exporting products when vendor has no products.""" """Test exporting products when vendor has no products."""
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.id}/export/letzshop", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export",
headers=admin_headers, headers=admin_headers,
) )
@@ -422,7 +422,7 @@ class TestAdminLetzshopExportAPI:
db.commit() db.commit()
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.id}/export/letzshop", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export",
headers=admin_headers, headers=admin_headers,
) )
@@ -468,7 +468,7 @@ class TestAdminLetzshopExportAPI:
db.commit() db.commit()
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.id}/export/letzshop?language=fr", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export?language=fr",
headers=admin_headers, headers=admin_headers,
) )
@@ -512,24 +512,24 @@ class TestAdminLetzshopExportAPI:
# Without include_inactive # Without include_inactive
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.id}/export/letzshop", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export",
headers=admin_headers, headers=admin_headers,
) )
assert "INACTIVE-001" not in response.text assert "INACTIVE-001" not in response.text
# With include_inactive # With include_inactive
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.id}/export/letzshop?include_inactive=true", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export?include_inactive=true",
headers=admin_headers, headers=admin_headers,
) )
assert "INACTIVE-001" in response.text assert "INACTIVE-001" in response.text
def test_export_vendor_products_by_vendor_code( def test_export_vendor_products_by_vendor_id(
self, client, admin_headers, test_vendor self, client, admin_headers, test_vendor
): ):
"""Test exporting products using vendor_code identifier.""" """Test exporting products using vendor ID."""
response = client.get( response = client.get(
f"/api/v1/admin/vendors/{test_vendor.vendor_code}/export/letzshop", f"/api/v1/admin/letzshop/vendors/{test_vendor.id}/export",
headers=admin_headers, headers=admin_headers,
) )
@@ -539,7 +539,7 @@ class TestAdminLetzshopExportAPI:
def test_export_vendor_not_found(self, client, admin_headers): def test_export_vendor_not_found(self, client, admin_headers):
"""Test exporting for non-existent vendor.""" """Test exporting for non-existent vendor."""
response = client.get( response = client.get(
"/api/v1/admin/vendors/999999/export/letzshop", "/api/v1/admin/letzshop/vendors/999999/export",
headers=admin_headers, headers=admin_headers,
) )

View File

@@ -177,65 +177,8 @@ class TestAdminService:
exception = exc_info.value exception = exc_info.value
assert exception.error_code == "VENDOR_NOT_FOUND" assert exception.error_code == "VENDOR_NOT_FOUND"
# Marketplace Import Jobs Tests # NOTE: Marketplace Import Jobs tests have been moved to the marketplace module.
def test_get_marketplace_import_jobs_no_filters( # See tests/unit/services/test_marketplace_import_job_service.py
self, db, test_marketplace_import_job
):
"""Test getting marketplace import jobs without filters"""
result = self.service.get_marketplace_import_jobs(db, skip=0, limit=10)
assert len(result) >= 1
# Find our test job in the results
test_job = next(
(job for job in result if job.job_id == test_marketplace_import_job.id),
None,
)
assert test_job is not None
assert (
test_job.marketplace.lower()
== test_marketplace_import_job.marketplace.lower()
)
assert test_job.status == test_marketplace_import_job.status
def test_get_marketplace_import_jobs_with_marketplace_filter(
self, db, test_marketplace_import_job
):
"""Test filtering marketplace import jobs by marketplace"""
result = self.service.get_marketplace_import_jobs(
db, marketplace=test_marketplace_import_job.marketplace, skip=0, limit=10
)
assert len(result) >= 1
for job in result:
assert (
test_marketplace_import_job.marketplace.lower()
in job.marketplace.lower()
)
def test_get_marketplace_import_jobs_with_status_filter(
self, db, test_marketplace_import_job
):
"""Test filtering marketplace import jobs by status"""
result = self.service.get_marketplace_import_jobs(
db, status=test_marketplace_import_job.status, skip=0, limit=10
)
assert len(result) >= 1
for job in result:
assert job.status == test_marketplace_import_job.status
def test_get_marketplace_import_jobs_pagination(
self, db, test_marketplace_import_job
):
"""Test marketplace import jobs pagination"""
result_page1 = self.service.get_marketplace_import_jobs(db, skip=0, limit=1)
result_page2 = self.service.get_marketplace_import_jobs(db, skip=1, limit=1)
assert len(result_page1) >= 0
assert len(result_page2) >= 0
if len(result_page1) > 0 and len(result_page2) > 0:
assert result_page1[0].job_id != result_page2[0].job_id
# Statistics Tests # Statistics Tests
def test_get_user_statistics(self, db, test_user, test_admin): def test_get_user_statistics(self, db, test_user, test_admin):

View File

@@ -0,0 +1,355 @@
# tests/unit/services/test_company_service.py
"""Unit tests for CompanyService."""
import uuid
import pytest
from app.modules.tenancy.exceptions import CompanyNotFoundException, UserNotFoundException
from app.modules.tenancy.services.company_service import company_service
from app.modules.tenancy.models import Company
from app.modules.tenancy.schemas.company import (
CompanyCreate,
CompanyUpdate,
CompanyTransferOwnership,
)
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def unverified_company(db, test_user):
"""Create an unverified test company."""
unique_id = str(uuid.uuid4())[:8]
company = Company(
name=f"Unverified Company {unique_id}",
owner_user_id=test_user.id,
contact_email=f"unverified{unique_id}@company.com",
is_active=True,
is_verified=False,
)
db.add(company)
db.commit()
db.refresh(company)
return company
@pytest.fixture
def inactive_company(db, test_user):
"""Create an inactive test company."""
unique_id = str(uuid.uuid4())[:8]
company = Company(
name=f"Inactive Company {unique_id}",
owner_user_id=test_user.id,
contact_email=f"inactive{unique_id}@company.com",
is_active=False,
is_verified=True,
)
db.add(company)
db.commit()
db.refresh(company)
return company
# =============================================================================
# CREATE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceCreate:
"""Test suite for company creation."""
def test_create_company_with_new_owner(self, db):
"""Test creating a company with a new owner user."""
unique_id = str(uuid.uuid4())[:8]
company_data = CompanyCreate(
name=f"New Company {unique_id}",
owner_email=f"newowner{unique_id}@example.com",
contact_email=f"contact{unique_id}@company.com",
description="A new test company",
)
company, owner, temp_password = company_service.create_company_with_owner(
db, company_data
)
db.commit()
assert company is not None
assert company.name == f"New Company {unique_id}"
assert company.is_active is True
assert company.is_verified is False
assert owner is not None
assert owner.email == f"newowner{unique_id}@example.com"
assert temp_password is not None # New user gets temp password
def test_create_company_with_existing_owner(self, db, test_user):
"""Test creating a company with an existing user as owner."""
unique_id = str(uuid.uuid4())[:8]
company_data = CompanyCreate(
name=f"Existing Owner Company {unique_id}",
owner_email=test_user.email,
contact_email=f"contact{unique_id}@company.com",
)
company, owner, temp_password = company_service.create_company_with_owner(
db, company_data
)
db.commit()
assert company is not None
assert owner.id == test_user.id
assert temp_password is None # Existing user doesn't get new password
# =============================================================================
# READ TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceRead:
"""Test suite for reading companies."""
def test_get_company_by_id_success(self, db, test_company):
"""Test getting a company by ID."""
company = company_service.get_company_by_id(db, test_company.id)
assert company is not None
assert company.id == test_company.id
assert company.name == test_company.name
def test_get_company_by_id_not_found(self, db):
"""Test getting a non-existent company raises exception."""
with pytest.raises(CompanyNotFoundException) as exc_info:
company_service.get_company_by_id(db, 99999)
assert "99999" in str(exc_info.value)
def test_get_companies_paginated(self, db, test_company, other_company):
"""Test getting paginated list of companies."""
companies, total = company_service.get_companies(db, skip=0, limit=10)
assert len(companies) >= 2
assert total >= 2
def test_get_companies_with_search(self, db, test_company):
"""Test searching companies by name."""
# Get the unique part of the company name
search_term = test_company.name.split()[0] # "Test"
companies, total = company_service.get_companies(
db, skip=0, limit=100, search=search_term
)
assert len(companies) >= 1
assert any(c.id == test_company.id for c in companies)
def test_get_companies_filter_by_active(self, db, test_company, inactive_company):
"""Test filtering companies by active status."""
active_companies, _ = company_service.get_companies(
db, skip=0, limit=100, is_active=True
)
inactive_companies, _ = company_service.get_companies(
db, skip=0, limit=100, is_active=False
)
assert all(c.is_active for c in active_companies)
assert all(not c.is_active for c in inactive_companies)
def test_get_companies_filter_by_verified(self, db, test_company, unverified_company):
"""Test filtering companies by verified status."""
verified_companies, _ = company_service.get_companies(
db, skip=0, limit=100, is_verified=True
)
unverified_companies, _ = company_service.get_companies(
db, skip=0, limit=100, is_verified=False
)
assert all(c.is_verified for c in verified_companies)
assert all(not c.is_verified for c in unverified_companies)
# =============================================================================
# UPDATE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceUpdate:
"""Test suite for updating companies."""
def test_update_company_success(self, db, test_company):
"""Test updating company information."""
unique_id = str(uuid.uuid4())[:8]
update_data = CompanyUpdate(
name=f"Updated Company {unique_id}",
description="Updated description",
)
updated = company_service.update_company(db, test_company.id, update_data)
db.commit()
assert updated.name == f"Updated Company {unique_id}"
assert updated.description == "Updated description"
def test_update_company_partial(self, db, test_company):
"""Test partial update of company."""
original_name = test_company.name
update_data = CompanyUpdate(description="Only description updated")
updated = company_service.update_company(db, test_company.id, update_data)
db.commit()
assert updated.name == original_name # Name unchanged
assert updated.description == "Only description updated"
def test_update_company_not_found(self, db):
"""Test updating non-existent company raises exception."""
update_data = CompanyUpdate(name="New Name")
with pytest.raises(CompanyNotFoundException):
company_service.update_company(db, 99999, update_data)
# =============================================================================
# DELETE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceDelete:
"""Test suite for deleting companies."""
def test_delete_company_success(self, db, test_user):
"""Test deleting a company."""
# Create a company to delete
unique_id = str(uuid.uuid4())[:8]
company = Company(
name=f"To Delete {unique_id}",
owner_user_id=test_user.id,
contact_email=f"delete{unique_id}@company.com",
is_active=True,
)
db.add(company)
db.commit()
company_id = company.id
# Delete it
company_service.delete_company(db, company_id)
db.commit()
# Verify it's gone
with pytest.raises(CompanyNotFoundException):
company_service.get_company_by_id(db, company_id)
def test_delete_company_not_found(self, db):
"""Test deleting non-existent company raises exception."""
with pytest.raises(CompanyNotFoundException):
company_service.delete_company(db, 99999)
# =============================================================================
# TOGGLE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceToggle:
"""Test suite for toggling company status."""
def test_toggle_verification_on(self, db, unverified_company):
"""Test setting verification to True."""
result = company_service.toggle_verification(db, unverified_company.id, True)
db.commit()
assert result.is_verified is True
def test_toggle_verification_off(self, db, test_company):
"""Test setting verification to False."""
result = company_service.toggle_verification(db, test_company.id, False)
db.commit()
assert result.is_verified is False
def test_toggle_active_on(self, db, inactive_company):
"""Test setting active status to True."""
result = company_service.toggle_active(db, inactive_company.id, True)
db.commit()
assert result.is_active is True
def test_toggle_active_off(self, db, test_company):
"""Test setting active status to False."""
result = company_service.toggle_active(db, test_company.id, False)
db.commit()
assert result.is_active is False
# =============================================================================
# OWNERSHIP TRANSFER TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestCompanyServiceOwnershipTransfer:
"""Test suite for company ownership transfer."""
def test_transfer_ownership_success(self, db, test_company, other_user):
"""Test successful ownership transfer."""
original_owner_id = test_company.owner_user_id
transfer_data = CompanyTransferOwnership(
new_owner_user_id=other_user.id,
transfer_reason="Testing ownership transfer",
)
company, old_owner, new_owner = company_service.transfer_ownership(
db, test_company.id, transfer_data
)
db.commit()
assert company.owner_user_id == other_user.id
assert old_owner.id == original_owner_id
assert new_owner.id == other_user.id
def test_transfer_ownership_to_same_owner_fails(self, db, test_company, test_user):
"""Test transfer to same owner raises error."""
transfer_data = CompanyTransferOwnership(
new_owner_user_id=test_user.id,
transfer_reason="This should fail",
)
with pytest.raises(ValueError) as exc_info:
company_service.transfer_ownership(db, test_company.id, transfer_data)
assert "current owner" in str(exc_info.value).lower()
def test_transfer_ownership_to_nonexistent_user_fails(self, db, test_company):
"""Test transfer to non-existent user raises exception."""
transfer_data = CompanyTransferOwnership(
new_owner_user_id=99999,
transfer_reason="This should fail",
)
with pytest.raises(UserNotFoundException):
company_service.transfer_ownership(db, test_company.id, transfer_data)
def test_transfer_ownership_nonexistent_company_fails(self, db, other_user):
"""Test transfer on non-existent company raises exception."""
transfer_data = CompanyTransferOwnership(
new_owner_user_id=other_user.id,
transfer_reason="This should fail",
)
with pytest.raises(CompanyNotFoundException):
company_service.transfer_ownership(db, 99999, transfer_data)

View File

@@ -0,0 +1,297 @@
# tests/unit/services/test_platform_service.py
"""Unit tests for PlatformService."""
import uuid
import pytest
from app.modules.tenancy.exceptions import PlatformNotFoundException
from app.modules.tenancy.services.platform_service import platform_service, PlatformStats
from app.modules.tenancy.models import Platform, VendorPlatform
from app.modules.cms.models import ContentPage
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def inactive_platform(db):
"""Create an inactive test platform."""
unique_id = str(uuid.uuid4())[:8]
platform = Platform(
code=f"inactive_{unique_id}",
name=f"Inactive Platform {unique_id}",
description="An inactive test platform",
path_prefix=f"inactive{unique_id}",
is_active=False,
is_public=False,
default_language="en",
supported_languages=["en"],
)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
@pytest.fixture
def platform_with_vendor(db, test_platform, test_vendor):
"""Create a vendor-platform assignment."""
vendor_platform = VendorPlatform(
vendor_id=test_vendor.id,
platform_id=test_platform.id,
is_active=True,
)
db.add(vendor_platform)
db.commit()
return test_platform
@pytest.fixture
def platform_with_pages(db, test_platform):
"""Create a platform with content pages."""
unique_id = str(uuid.uuid4())[:8]
# Platform marketing page (published)
platform_page = ContentPage(
platform_id=test_platform.id,
vendor_id=None,
slug=f"platform-page-{unique_id}",
page_type="marketing",
is_platform_page=True,
is_published=True,
)
db.add(platform_page)
# Vendor default page (draft)
default_page = ContentPage(
platform_id=test_platform.id,
vendor_id=None,
slug=f"vendor-default-{unique_id}",
page_type="about",
is_platform_page=False,
is_published=False,
)
db.add(default_page)
db.commit()
return test_platform
# =============================================================================
# GET PLATFORM TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestPlatformServiceGet:
"""Test suite for getting platforms."""
def test_get_platform_by_code_success(self, db, test_platform):
"""Test getting a platform by code."""
platform = platform_service.get_platform_by_code(db, test_platform.code)
assert platform is not None
assert platform.id == test_platform.id
assert platform.code == test_platform.code
def test_get_platform_by_code_not_found(self, db):
"""Test getting a non-existent platform raises exception."""
with pytest.raises(PlatformNotFoundException) as exc_info:
platform_service.get_platform_by_code(db, "nonexistent_platform")
assert "nonexistent_platform" in str(exc_info.value)
def test_get_platform_by_code_optional_found(self, db, test_platform):
"""Test optional get returns platform when found."""
platform = platform_service.get_platform_by_code_optional(db, test_platform.code)
assert platform is not None
assert platform.id == test_platform.id
def test_get_platform_by_code_optional_not_found(self, db):
"""Test optional get returns None when not found."""
platform = platform_service.get_platform_by_code_optional(db, "nonexistent")
assert platform is None
def test_get_platform_by_id_success(self, db, test_platform):
"""Test getting a platform by ID."""
platform = platform_service.get_platform_by_id(db, test_platform.id)
assert platform is not None
assert platform.code == test_platform.code
def test_get_platform_by_id_not_found(self, db):
"""Test getting a non-existent platform by ID raises exception."""
with pytest.raises(PlatformNotFoundException):
platform_service.get_platform_by_id(db, 99999)
# =============================================================================
# LIST PLATFORMS TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestPlatformServiceList:
"""Test suite for listing platforms."""
def test_list_platforms_active_only(self, db, test_platform, inactive_platform):
"""Test listing only active platforms."""
platforms = platform_service.list_platforms(db, include_inactive=False)
platform_ids = [p.id for p in platforms]
assert test_platform.id in platform_ids
assert inactive_platform.id not in platform_ids
def test_list_platforms_include_inactive(self, db, test_platform, inactive_platform):
"""Test listing all platforms including inactive."""
platforms = platform_service.list_platforms(db, include_inactive=True)
platform_ids = [p.id for p in platforms]
assert test_platform.id in platform_ids
assert inactive_platform.id in platform_ids
# =============================================================================
# COUNT TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestPlatformServiceCounts:
"""Test suite for platform counts."""
def test_get_vendor_count_zero(self, db, test_platform):
"""Test vendor count is zero when no vendors assigned."""
count = platform_service.get_vendor_count(db, test_platform.id)
assert count == 0
def test_get_vendor_count_with_vendors(self, db, platform_with_vendor):
"""Test vendor count with assigned vendors."""
count = platform_service.get_vendor_count(db, platform_with_vendor.id)
assert count >= 1
def test_get_platform_pages_count(self, db, platform_with_pages):
"""Test platform pages count."""
count = platform_service.get_platform_pages_count(db, platform_with_pages.id)
assert count >= 1
def test_get_vendor_defaults_count(self, db, platform_with_pages):
"""Test vendor defaults count."""
count = platform_service.get_vendor_defaults_count(db, platform_with_pages.id)
assert count >= 1
def test_get_published_pages_count(self, db, platform_with_pages):
"""Test published pages count."""
count = platform_service.get_published_pages_count(db, platform_with_pages.id)
assert count >= 1
def test_get_draft_pages_count(self, db, platform_with_pages):
"""Test draft pages count."""
count = platform_service.get_draft_pages_count(db, platform_with_pages.id)
assert count >= 1
# =============================================================================
# PLATFORM STATS TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestPlatformServiceStats:
"""Test suite for platform statistics."""
def test_get_platform_stats(self, db, platform_with_pages, test_vendor):
"""Test getting comprehensive platform statistics."""
# Add a vendor to the platform
vendor_platform = VendorPlatform(
vendor_id=test_vendor.id,
platform_id=platform_with_pages.id,
is_active=True,
)
db.add(vendor_platform)
db.commit()
stats = platform_service.get_platform_stats(db, platform_with_pages)
assert isinstance(stats, PlatformStats)
assert stats.platform_id == platform_with_pages.id
assert stats.platform_code == platform_with_pages.code
assert stats.platform_name == platform_with_pages.name
assert stats.vendor_count >= 1
assert stats.platform_pages_count >= 1
assert stats.vendor_defaults_count >= 1
def test_get_platform_stats_empty_platform(self, db, test_platform):
"""Test stats for a platform with no content."""
stats = platform_service.get_platform_stats(db, test_platform)
assert stats.vendor_count == 0
assert stats.platform_pages_count == 0
assert stats.vendor_defaults_count == 0
# =============================================================================
# UPDATE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestPlatformServiceUpdate:
"""Test suite for updating platforms."""
def test_update_platform_name(self, db, test_platform):
"""Test updating platform name."""
unique_id = str(uuid.uuid4())[:8]
new_name = f"Updated Platform {unique_id}"
updated = platform_service.update_platform(
db, test_platform, {"name": new_name}
)
db.commit()
assert updated.name == new_name
def test_update_platform_multiple_fields(self, db, test_platform):
"""Test updating multiple platform fields."""
unique_id = str(uuid.uuid4())[:8]
update_data = {
"name": f"Multi Update {unique_id}",
"description": "Updated description",
"is_public": False,
}
updated = platform_service.update_platform(db, test_platform, update_data)
db.commit()
assert updated.name == f"Multi Update {unique_id}"
assert updated.description == "Updated description"
assert updated.is_public is False
def test_update_platform_ignores_invalid_fields(self, db, test_platform):
"""Test that invalid fields are ignored during update."""
original_name = test_platform.name
update_data = {
"nonexistent_field": "value",
"name": original_name, # Keep same name
}
# Should not raise an error
updated = platform_service.update_platform(db, test_platform, update_data)
assert updated.name == original_name

View File

@@ -0,0 +1,420 @@
# tests/unit/services/test_vendor_domain_service.py
"""Unit tests for VendorDomainService."""
import uuid
from datetime import UTC, datetime
from unittest.mock import MagicMock, patch
import pytest
from pydantic import ValidationError
from app.modules.tenancy.exceptions import (
DNSVerificationException,
DomainAlreadyVerifiedException,
DomainNotVerifiedException,
DomainVerificationFailedException,
MaxDomainsReachedException,
VendorDomainAlreadyExistsException,
VendorDomainNotFoundException,
VendorNotFoundException,
)
from app.modules.tenancy.models import VendorDomain
from app.modules.tenancy.schemas.vendor_domain import VendorDomainCreate, VendorDomainUpdate
from app.modules.tenancy.services.vendor_domain_service import vendor_domain_service
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def test_domain(db, test_vendor):
"""Create a test domain for a vendor."""
unique_id = str(uuid.uuid4())[:8]
domain = VendorDomain(
vendor_id=test_vendor.id,
domain=f"test{unique_id}.example.com",
is_primary=False,
is_active=False,
is_verified=False,
verification_token=f"token_{unique_id}",
ssl_status="pending",
)
db.add(domain)
db.commit()
db.refresh(domain)
return domain
@pytest.fixture
def verified_domain(db, test_vendor):
"""Create a verified domain for a vendor."""
unique_id = str(uuid.uuid4())[:8]
domain = VendorDomain(
vendor_id=test_vendor.id,
domain=f"verified{unique_id}.example.com",
is_primary=False,
is_active=True,
is_verified=True,
verified_at=datetime.now(UTC),
verification_token=f"verified_token_{unique_id}",
ssl_status="active",
)
db.add(domain)
db.commit()
db.refresh(domain)
return domain
@pytest.fixture
def primary_domain(db, test_vendor):
"""Create a primary domain for a vendor."""
unique_id = str(uuid.uuid4())[:8]
domain = VendorDomain(
vendor_id=test_vendor.id,
domain=f"primary{unique_id}.example.com",
is_primary=True,
is_active=True,
is_verified=True,
verified_at=datetime.now(UTC),
verification_token=f"primary_token_{unique_id}",
ssl_status="active",
)
db.add(domain)
db.commit()
db.refresh(domain)
return domain
# =============================================================================
# ADD DOMAIN TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceAdd:
"""Test suite for adding vendor domains."""
def test_add_domain_success(self, db, test_vendor):
"""Test successfully adding a domain to a vendor."""
unique_id = str(uuid.uuid4())[:8]
domain_data = VendorDomainCreate(
domain=f"newdomain{unique_id}.example.com",
is_primary=False,
)
result = vendor_domain_service.add_domain(db, test_vendor.id, domain_data)
db.commit()
assert result is not None
assert result.vendor_id == test_vendor.id
assert result.domain == f"newdomain{unique_id}.example.com"
assert result.is_primary is False
assert result.is_verified is False
assert result.is_active is False
assert result.verification_token is not None
def test_add_domain_as_primary(self, db, test_vendor, primary_domain):
"""Test adding a domain as primary unsets other primary domains."""
unique_id = str(uuid.uuid4())[:8]
domain_data = VendorDomainCreate(
domain=f"newprimary{unique_id}.example.com",
is_primary=True,
)
result = vendor_domain_service.add_domain(db, test_vendor.id, domain_data)
db.commit()
db.refresh(primary_domain)
assert result.is_primary is True
assert primary_domain.is_primary is False
def test_add_domain_vendor_not_found(self, db):
"""Test adding domain to non-existent vendor raises exception."""
domain_data = VendorDomainCreate(
domain="test.example.com",
is_primary=False,
)
with pytest.raises(VendorNotFoundException):
vendor_domain_service.add_domain(db, 99999, domain_data)
def test_add_domain_already_exists(self, db, test_vendor, test_domain):
"""Test adding a domain that already exists raises exception."""
domain_data = VendorDomainCreate(
domain=test_domain.domain,
is_primary=False,
)
with pytest.raises(VendorDomainAlreadyExistsException):
vendor_domain_service.add_domain(db, test_vendor.id, domain_data)
def test_add_domain_max_limit_reached(self, db, test_vendor):
"""Test adding domain when max limit reached raises exception."""
# Create max domains
for i in range(vendor_domain_service.max_domains_per_vendor):
domain = VendorDomain(
vendor_id=test_vendor.id,
domain=f"domain{i}_{uuid.uuid4().hex[:6]}.example.com",
verification_token=f"token_{i}_{uuid.uuid4().hex[:6]}",
)
db.add(domain)
db.commit()
domain_data = VendorDomainCreate(
domain="onemore.example.com",
is_primary=False,
)
with pytest.raises(MaxDomainsReachedException):
vendor_domain_service.add_domain(db, test_vendor.id, domain_data)
def test_add_domain_reserved_subdomain(self, db, test_vendor):
"""Test adding a domain with reserved subdomain raises exception.
Note: Reserved subdomain validation happens in Pydantic schema first.
"""
with pytest.raises(ValidationError) as exc_info:
VendorDomainCreate(
domain="admin.example.com",
is_primary=False,
)
assert "reserved subdomain" in str(exc_info.value).lower()
# =============================================================================
# GET DOMAINS TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceGet:
"""Test suite for getting vendor domains."""
def test_get_vendor_domains_success(self, db, test_vendor, test_domain, verified_domain):
"""Test getting all domains for a vendor."""
domains = vendor_domain_service.get_vendor_domains(db, test_vendor.id)
assert len(domains) >= 2
domain_ids = [d.id for d in domains]
assert test_domain.id in domain_ids
assert verified_domain.id in domain_ids
def test_get_vendor_domains_empty(self, db, test_vendor):
"""Test getting domains for vendor with no domains."""
domains = vendor_domain_service.get_vendor_domains(db, test_vendor.id)
# May have domains from other fixtures, so just check it returns a list
assert isinstance(domains, list)
def test_get_vendor_domains_vendor_not_found(self, db):
"""Test getting domains for non-existent vendor raises exception."""
with pytest.raises(VendorNotFoundException):
vendor_domain_service.get_vendor_domains(db, 99999)
def test_get_domain_by_id_success(self, db, test_domain):
"""Test getting a domain by ID."""
domain = vendor_domain_service.get_domain_by_id(db, test_domain.id)
assert domain is not None
assert domain.id == test_domain.id
assert domain.domain == test_domain.domain
def test_get_domain_by_id_not_found(self, db):
"""Test getting non-existent domain raises exception."""
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.get_domain_by_id(db, 99999)
# =============================================================================
# UPDATE DOMAIN TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceUpdate:
"""Test suite for updating vendor domains."""
def test_update_domain_set_primary(self, db, test_domain, primary_domain):
"""Test setting a domain as primary."""
update_data = VendorDomainUpdate(is_primary=True)
# First verify the domain (required for activation)
test_domain.is_verified = True
db.commit()
result = vendor_domain_service.update_domain(db, test_domain.id, update_data)
db.commit()
db.refresh(primary_domain)
assert result.is_primary is True
assert primary_domain.is_primary is False
def test_update_domain_activate_verified(self, db, verified_domain):
"""Test activating a verified domain."""
verified_domain.is_active = False
db.commit()
update_data = VendorDomainUpdate(is_active=True)
result = vendor_domain_service.update_domain(db, verified_domain.id, update_data)
db.commit()
assert result.is_active is True
def test_update_domain_activate_unverified_fails(self, db, test_domain):
"""Test activating an unverified domain raises exception."""
update_data = VendorDomainUpdate(is_active=True)
with pytest.raises(DomainNotVerifiedException):
vendor_domain_service.update_domain(db, test_domain.id, update_data)
def test_update_domain_not_found(self, db):
"""Test updating non-existent domain raises exception."""
update_data = VendorDomainUpdate(is_primary=True)
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.update_domain(db, 99999, update_data)
def test_update_domain_deactivate(self, db, verified_domain):
"""Test deactivating a domain."""
update_data = VendorDomainUpdate(is_active=False)
result = vendor_domain_service.update_domain(db, verified_domain.id, update_data)
db.commit()
assert result.is_active is False
# =============================================================================
# DELETE DOMAIN TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceDelete:
"""Test suite for deleting vendor domains."""
def test_delete_domain_success(self, db, test_vendor):
"""Test successfully deleting a domain."""
# Create a domain to delete
unique_id = str(uuid.uuid4())[:8]
domain = VendorDomain(
vendor_id=test_vendor.id,
domain=f"todelete{unique_id}.example.com",
verification_token=f"delete_token_{unique_id}",
)
db.add(domain)
db.commit()
domain_id = domain.id
domain_name = domain.domain
result = vendor_domain_service.delete_domain(db, domain_id)
db.commit()
assert domain_name in result
# Verify it's gone
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.get_domain_by_id(db, domain_id)
def test_delete_domain_not_found(self, db):
"""Test deleting non-existent domain raises exception."""
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.delete_domain(db, 99999)
# =============================================================================
# VERIFY DOMAIN TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceVerify:
"""Test suite for domain verification."""
@patch("dns.resolver.resolve")
def test_verify_domain_success(self, mock_resolve, db, test_domain):
"""Test successful domain verification."""
# Mock DNS response
mock_txt = MagicMock()
mock_txt.to_text.return_value = f'"{test_domain.verification_token}"'
mock_resolve.return_value = [mock_txt]
domain, message = vendor_domain_service.verify_domain(db, test_domain.id)
db.commit()
assert domain.is_verified is True
assert domain.verified_at is not None
assert "verified successfully" in message.lower()
def test_verify_domain_already_verified(self, db, verified_domain):
"""Test verifying already verified domain raises exception."""
with pytest.raises(DomainAlreadyVerifiedException):
vendor_domain_service.verify_domain(db, verified_domain.id)
def test_verify_domain_not_found(self, db):
"""Test verifying non-existent domain raises exception."""
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.verify_domain(db, 99999)
@patch("dns.resolver.resolve")
def test_verify_domain_token_not_found(self, mock_resolve, db, test_domain):
"""Test verification fails when token not found in DNS."""
# Mock DNS response with wrong token
mock_txt = MagicMock()
mock_txt.to_text.return_value = '"wrong_token"'
mock_resolve.return_value = [mock_txt]
with pytest.raises(DomainVerificationFailedException) as exc_info:
vendor_domain_service.verify_domain(db, test_domain.id)
assert "token not found" in str(exc_info.value).lower()
@patch("dns.resolver.resolve")
def test_verify_domain_dns_nxdomain(self, mock_resolve, db, test_domain):
"""Test verification fails when DNS record doesn't exist."""
import dns.resolver
mock_resolve.side_effect = dns.resolver.NXDOMAIN()
with pytest.raises(DomainVerificationFailedException):
vendor_domain_service.verify_domain(db, test_domain.id)
# =============================================================================
# VERIFICATION INSTRUCTIONS TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorDomainServiceInstructions:
"""Test suite for verification instructions."""
def test_get_verification_instructions(self, db, test_domain):
"""Test getting verification instructions."""
instructions = vendor_domain_service.get_verification_instructions(
db, test_domain.id
)
assert instructions["domain"] == test_domain.domain
assert instructions["verification_token"] == test_domain.verification_token
assert "instructions" in instructions
assert "txt_record" in instructions
assert instructions["txt_record"]["type"] == "TXT"
assert instructions["txt_record"]["name"] == "_wizamart-verify"
assert "common_registrars" in instructions
def test_get_verification_instructions_not_found(self, db):
"""Test getting instructions for non-existent domain raises exception."""
with pytest.raises(VendorDomainNotFoundException):
vendor_domain_service.get_verification_instructions(db, 99999)

View File

@@ -1,5 +1,9 @@
# tests/unit/services/test_vendor_service.py # tests/unit/services/test_vendor_service.py
"""Unit tests for VendorService following the application's exception patterns.""" """Unit tests for VendorService following the application's exception patterns.
Note: Product catalog operations (add_product_to_catalog, get_products) have been
moved to app.modules.catalog.services. See test_product_service.py for those tests.
"""
import uuid import uuid
@@ -12,12 +16,9 @@ from app.modules.tenancy.exceptions import (
VendorAlreadyExistsException, VendorAlreadyExistsException,
VendorNotFoundException, VendorNotFoundException,
) )
from app.modules.marketplace.exceptions import MarketplaceProductNotFoundException
from app.modules.catalog.exceptions import ProductAlreadyExistsException
from app.modules.tenancy.services.vendor_service import VendorService from app.modules.tenancy.services.vendor_service import VendorService
from app.modules.tenancy.models import Company from app.modules.tenancy.models import Company
from app.modules.tenancy.models import Vendor from app.modules.tenancy.models import Vendor
from app.modules.catalog.schemas import ProductCreate
from app.modules.tenancy.schemas.vendor import VendorCreate from app.modules.tenancy.schemas.vendor import VendorCreate
@@ -354,98 +355,8 @@ class TestVendorService:
assert vendor.is_active is False assert vendor.is_active is False
# ==================== add_product_to_catalog Tests ==================== # NOTE: add_product_to_catalog and get_products tests have been moved to
# test_product_service.py since those methods are now in the catalog module.
def test_add_product_to_vendor_success(self, db, test_vendor, unique_product):
"""Test successfully adding product to vendor."""
from app.modules.marketplace.models import MarketplaceProduct
# Re-query objects to avoid session issues
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
mp = (
db.query(MarketplaceProduct)
.filter(MarketplaceProduct.id == unique_product.id)
.first()
)
product_data = ProductCreate(
marketplace_product_id=mp.id,
price=15.99,
is_featured=True,
)
product = self.service.add_product_to_catalog(db, vendor, product_data)
db.commit()
assert product is not None
assert product.vendor_id == vendor.id
assert product.marketplace_product_id == mp.id
def test_add_product_to_vendor_product_not_found(self, db, test_vendor):
"""Test adding non-existent product to vendor fails."""
product_data = ProductCreate(
marketplace_product_id=99999, # Non-existent ID
price=15.99,
)
with pytest.raises(MarketplaceProductNotFoundException) as exc_info:
self.service.add_product_to_catalog(db, test_vendor, product_data)
exception = exc_info.value
assert exception.status_code == 404
assert exception.error_code == "PRODUCT_NOT_FOUND"
def test_add_product_to_vendor_already_exists(self, db, test_vendor, test_product):
"""Test adding product that's already in vendor fails."""
# Re-query to get fresh instances
from app.modules.catalog.models import Product
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
product = db.query(Product).filter(Product.id == test_product.id).first()
product_data = ProductCreate(
marketplace_product_id=product.marketplace_product_id,
price=15.99,
)
with pytest.raises(ProductAlreadyExistsException) as exc_info:
self.service.add_product_to_catalog(db, vendor, product_data)
exception = exc_info.value
assert exception.status_code == 409
assert exception.error_code == "PRODUCT_ALREADY_EXISTS"
# ==================== get_products Tests ====================
def test_get_products_owner_access(self, db, test_user, test_vendor, test_product):
"""Test vendor owner can get vendor products."""
# Re-query vendor to get fresh instance
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
products, total = self.service.get_products(db, vendor, test_user)
assert total >= 1
assert len(products) >= 1
def test_get_products_access_denied(self, db, test_user, inactive_vendor):
"""Test non-owner cannot access unverified vendor products."""
# Re-query vendor to get fresh instance
vendor = db.query(Vendor).filter(Vendor.id == inactive_vendor.id).first()
with pytest.raises(UnauthorizedVendorAccessException) as exc_info:
self.service.get_products(db, vendor, test_user)
exception = exc_info.value
assert exception.status_code == 403
assert exception.error_code == "UNAUTHORIZED_VENDOR_ACCESS"
def test_get_products_with_filters(self, db, test_user, test_vendor, test_product):
"""Test getting vendor products with various filters."""
# Re-query vendor to get fresh instance
vendor = db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
# Test active only filter
products, total = self.service.get_products(
db, vendor, test_user, active_only=True
)
assert all(p.is_active for p in products)
# ==================== Helper Method Tests ==================== # ==================== Helper Method Tests ====================

View File

@@ -0,0 +1,506 @@
# tests/unit/services/test_vendor_team_service.py
"""Unit tests for VendorTeamService."""
import uuid
from datetime import datetime, timedelta
from unittest.mock import patch
import pytest
from app.modules.tenancy.exceptions import (
CannotRemoveOwnerException,
InvalidInvitationTokenException,
TeamInvitationAlreadyAcceptedException,
TeamMemberAlreadyExistsException,
UserNotFoundException,
)
from app.modules.tenancy.models import Role, User, Vendor, VendorUser, VendorUserType
from app.modules.tenancy.services.vendor_team_service import vendor_team_service
# =============================================================================
# FIXTURES
# =============================================================================
@pytest.fixture
def team_vendor(db, test_company):
"""Create a vendor for team tests."""
unique_id = str(uuid.uuid4())[:8]
vendor = Vendor(
company_id=test_company.id,
vendor_code=f"TEAMVENDOR_{unique_id.upper()}",
subdomain=f"teamvendor{unique_id.lower()}",
name=f"Team Vendor {unique_id}",
is_active=True,
is_verified=True,
)
db.add(vendor)
db.commit()
db.refresh(vendor)
return vendor
@pytest.fixture
def vendor_owner(db, team_vendor, test_user):
"""Create an owner for the team vendor."""
vendor_user = VendorUser(
vendor_id=team_vendor.id,
user_id=test_user.id,
user_type=VendorUserType.OWNER.value,
is_active=True,
)
db.add(vendor_user)
db.commit()
db.refresh(vendor_user)
return vendor_user
@pytest.fixture
def team_member(db, team_vendor, other_user, auth_manager):
"""Create a team member for the vendor."""
# Create a role first
role = Role(
vendor_id=team_vendor.id,
name="staff",
permissions=["orders.view", "products.view"],
)
db.add(role)
db.commit()
vendor_user = VendorUser(
vendor_id=team_vendor.id,
user_id=other_user.id,
user_type=VendorUserType.TEAM_MEMBER.value,
role_id=role.id,
is_active=True,
invitation_accepted_at=datetime.utcnow(),
)
db.add(vendor_user)
db.commit()
db.refresh(vendor_user)
return vendor_user
@pytest.fixture
def pending_invitation(db, team_vendor, test_user, auth_manager):
"""Create a pending invitation."""
unique_id = str(uuid.uuid4())[:8]
# Create new user for invitation
new_user = User(
email=f"pending_{unique_id}@example.com",
username=f"pending_{unique_id}",
hashed_password=auth_manager.hash_password("temppass"),
role="vendor",
is_active=False,
)
db.add(new_user)
db.commit()
# Create role
role = Role(
vendor_id=team_vendor.id,
name="support",
permissions=["support.view"],
)
db.add(role)
db.commit()
# Create pending vendor user
vendor_user = VendorUser(
vendor_id=team_vendor.id,
user_id=new_user.id,
user_type=VendorUserType.TEAM_MEMBER.value,
role_id=role.id,
invited_by=test_user.id,
invitation_token=f"pending_token_{unique_id}",
invitation_sent_at=datetime.utcnow(),
is_active=False,
)
db.add(vendor_user)
db.commit()
db.refresh(vendor_user)
return vendor_user
@pytest.fixture
def expired_invitation(db, team_vendor, test_user, auth_manager):
"""Create an expired invitation."""
unique_id = str(uuid.uuid4())[:8]
# Create new user for invitation
new_user = User(
email=f"expired_{unique_id}@example.com",
username=f"expired_{unique_id}",
hashed_password=auth_manager.hash_password("temppass"),
role="vendor",
is_active=False,
)
db.add(new_user)
db.commit()
# Create role
role = Role(
vendor_id=team_vendor.id,
name="viewer",
permissions=["read_only"],
)
db.add(role)
db.commit()
# Create expired vendor user (sent 8 days ago, expires in 7)
vendor_user = VendorUser(
vendor_id=team_vendor.id,
user_id=new_user.id,
user_type=VendorUserType.TEAM_MEMBER.value,
role_id=role.id,
invited_by=test_user.id,
invitation_token=f"expired_token_{unique_id}",
invitation_sent_at=datetime.utcnow() - timedelta(days=8),
is_active=False,
)
db.add(vendor_user)
db.commit()
db.refresh(vendor_user)
return vendor_user
# =============================================================================
# INVITE TEAM MEMBER TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceInvite:
"""Test suite for inviting team members."""
@patch("app.modules.billing.services.subscription_service.SubscriptionService.check_team_limit")
def test_invite_new_user(self, mock_check_limit, db, team_vendor, test_user):
"""Test inviting a new user to the team."""
mock_check_limit.return_value = None # No limit reached
unique_id = str(uuid.uuid4())[:8]
email = f"newmember_{unique_id}@example.com"
result = vendor_team_service.invite_team_member(
db=db,
vendor=team_vendor,
inviter=test_user,
email=email,
role_name="staff",
)
db.commit()
assert result is not None
assert result["email"] == email
assert result["invitation_token"] is not None
assert result["role"] == "staff"
assert result["existing_user"] is False
@patch("app.modules.billing.services.subscription_service.SubscriptionService.check_team_limit")
def test_invite_existing_user(self, mock_check_limit, db, team_vendor, test_user, other_user):
"""Test inviting an existing user to the team."""
mock_check_limit.return_value = None
result = vendor_team_service.invite_team_member(
db=db,
vendor=team_vendor,
inviter=test_user,
email=other_user.email,
role_name="manager",
)
db.commit()
assert result is not None
assert result["email"] == other_user.email
assert result["invitation_token"] is not None
@patch("app.modules.billing.services.subscription_service.SubscriptionService.check_team_limit")
def test_invite_already_member_raises_error(
self, mock_check_limit, db, team_vendor, test_user, team_member
):
"""Test inviting an existing member raises exception."""
mock_check_limit.return_value = None
with pytest.raises(TeamMemberAlreadyExistsException):
vendor_team_service.invite_team_member(
db=db,
vendor=team_vendor,
inviter=test_user,
email=team_member.user.email,
role_name="staff",
)
@patch("app.modules.billing.services.subscription_service.SubscriptionService.check_team_limit")
def test_invite_with_custom_permissions(self, mock_check_limit, db, team_vendor, test_user):
"""Test inviting with custom permissions."""
mock_check_limit.return_value = None
unique_id = str(uuid.uuid4())[:8]
email = f"custom_{unique_id}@example.com"
custom_perms = ["orders.view", "orders.edit", "products.view"]
result = vendor_team_service.invite_team_member(
db=db,
vendor=team_vendor,
inviter=test_user,
email=email,
role_name="custom_role",
custom_permissions=custom_perms,
)
db.commit()
assert result is not None
assert result["role"] == "custom_role"
# =============================================================================
# ACCEPT INVITATION TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceAccept:
"""Test suite for accepting invitations."""
def test_accept_invitation_success(self, db, pending_invitation):
"""Test accepting a valid invitation."""
result = vendor_team_service.accept_invitation(
db=db,
invitation_token=pending_invitation.invitation_token,
password="newpassword123",
first_name="John",
last_name="Doe",
)
db.commit()
assert result is not None
assert result["user"].is_active is True
assert result["user"].first_name == "John"
assert result["user"].last_name == "Doe"
def test_accept_invitation_invalid_token(self, db):
"""Test accepting with invalid token raises exception."""
with pytest.raises(InvalidInvitationTokenException):
vendor_team_service.accept_invitation(
db=db,
invitation_token="invalid_token_12345",
password="password123",
)
def test_accept_invitation_already_accepted(self, db, team_member):
"""Test accepting an already accepted invitation raises exception."""
# team_member already has invitation_accepted_at set
with pytest.raises(InvalidInvitationTokenException):
vendor_team_service.accept_invitation(
db=db,
invitation_token="some_token", # team_member has no token
password="password123",
)
def test_accept_invitation_expired(self, db, expired_invitation):
"""Test accepting an expired invitation raises exception."""
with pytest.raises(InvalidInvitationTokenException) as exc_info:
vendor_team_service.accept_invitation(
db=db,
invitation_token=expired_invitation.invitation_token,
password="password123",
)
assert "expired" in str(exc_info.value).lower()
# =============================================================================
# REMOVE TEAM MEMBER TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceRemove:
"""Test suite for removing team members."""
def test_remove_team_member_success(self, db, team_vendor, team_member):
"""Test removing a team member."""
result = vendor_team_service.remove_team_member(
db=db,
vendor=team_vendor,
user_id=team_member.user_id,
)
db.commit()
db.refresh(team_member)
assert result is True
assert team_member.is_active is False
def test_remove_owner_raises_error(self, db, team_vendor, vendor_owner):
"""Test removing owner raises exception."""
with pytest.raises(CannotRemoveOwnerException):
vendor_team_service.remove_team_member(
db=db,
vendor=team_vendor,
user_id=vendor_owner.user_id,
)
def test_remove_nonexistent_user_raises_error(self, db, team_vendor):
"""Test removing non-existent user raises exception."""
with pytest.raises(UserNotFoundException):
vendor_team_service.remove_team_member(
db=db,
vendor=team_vendor,
user_id=99999,
)
# =============================================================================
# UPDATE MEMBER ROLE TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceUpdateRole:
"""Test suite for updating member roles."""
def test_update_role_success(self, db, team_vendor, team_member):
"""Test updating a member's role."""
result = vendor_team_service.update_member_role(
db=db,
vendor=team_vendor,
user_id=team_member.user_id,
new_role_name="manager",
)
db.commit()
assert result is not None
assert result.role.name == "manager"
def test_update_owner_role_raises_error(self, db, team_vendor, vendor_owner):
"""Test updating owner's role raises exception."""
with pytest.raises(CannotRemoveOwnerException):
vendor_team_service.update_member_role(
db=db,
vendor=team_vendor,
user_id=vendor_owner.user_id,
new_role_name="staff",
)
def test_update_role_with_custom_permissions(self, db, team_vendor, team_member):
"""Test updating role with custom permissions."""
custom_perms = ["orders.view", "orders.edit", "reports.view"]
result = vendor_team_service.update_member_role(
db=db,
vendor=team_vendor,
user_id=team_member.user_id,
new_role_name="analyst",
custom_permissions=custom_perms,
)
db.commit()
assert result.role.name == "analyst"
assert result.role.permissions == custom_perms
def test_update_nonexistent_user_raises_error(self, db, team_vendor):
"""Test updating non-existent user raises exception."""
with pytest.raises(UserNotFoundException):
vendor_team_service.update_member_role(
db=db,
vendor=team_vendor,
user_id=99999,
new_role_name="staff",
)
# =============================================================================
# GET TEAM MEMBERS TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceGetMembers:
"""Test suite for getting team members."""
def test_get_team_members(self, db, team_vendor, vendor_owner, team_member):
"""Test getting all team members."""
members = vendor_team_service.get_team_members(db, team_vendor)
assert len(members) >= 2
user_ids = [m["id"] for m in members]
assert vendor_owner.user_id in user_ids
assert team_member.user_id in user_ids
def test_get_team_members_excludes_inactive(
self, db, team_vendor, vendor_owner, team_member
):
"""Test getting only active team members."""
# Deactivate team_member
team_member.is_active = False
db.commit()
members = vendor_team_service.get_team_members(
db, team_vendor, include_inactive=False
)
user_ids = [m["id"] for m in members]
assert vendor_owner.user_id in user_ids
assert team_member.user_id not in user_ids
def test_get_team_members_includes_inactive(
self, db, team_vendor, vendor_owner, team_member
):
"""Test getting all members including inactive."""
# Deactivate team_member
team_member.is_active = False
db.commit()
members = vendor_team_service.get_team_members(
db, team_vendor, include_inactive=True
)
user_ids = [m["id"] for m in members]
assert vendor_owner.user_id in user_ids
assert team_member.user_id in user_ids
# =============================================================================
# GET VENDOR ROLES TESTS
# =============================================================================
@pytest.mark.unit
@pytest.mark.tenancy
class TestVendorTeamServiceGetRoles:
"""Test suite for getting vendor roles."""
def test_get_vendor_roles_existing(self, db, team_vendor, team_member):
"""Test getting roles when they exist."""
# team_member fixture creates a role
roles = vendor_team_service.get_vendor_roles(db, team_vendor.id)
assert len(roles) >= 1
role_names = [r["name"] for r in roles]
assert "staff" in role_names
def test_get_vendor_roles_creates_defaults(self, db, team_vendor):
"""Test default roles are created if none exist."""
roles = vendor_team_service.get_vendor_roles(db, team_vendor.id)
db.commit()
assert len(roles) >= 5 # Default roles
role_names = [r["name"] for r in roles]
assert "manager" in role_names
assert "staff" in role_names
assert "support" in role_names
assert "viewer" in role_names
assert "marketing" in role_names
def test_get_vendor_roles_returns_permissions(self, db, team_vendor):
"""Test roles include permissions."""
roles = vendor_team_service.get_vendor_roles(db, team_vendor.id)
for role in roles:
assert "permissions" in role
assert isinstance(role["permissions"], list)