refactor(arch): eliminate all cross-module model imports in service layer
Some checks failed
CI / ruff (push) Successful in 9s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled

Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports
remain in any service file. All 66 files migrated using deferred import
patterns (method-body, _get_model() helpers, instance-cached self._Model)
and new cross-module service methods in tenancy. Documentation updated
with Pattern 6 (deferred imports), migration plan marked complete, and
violations status reflects 84→0 service-layer violations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-27 06:13:15 +01:00
parent e3a52f6536
commit 86e85a98b8
66 changed files with 2242 additions and 1295 deletions

View File

@@ -802,6 +802,14 @@ class AdminService:
"""
return db.query(User).filter(User.id == user_id).first()
def get_user_by_email(self, db: Session, email: str) -> User | None:
"""Get user by email."""
return db.query(User).filter(User.email == email).first()
def get_user_by_username(self, db: Session, username: str) -> User | None:
"""Get user by username."""
return db.query(User).filter(User.username == username).first()
def _get_user_by_id_or_raise(self, db: Session, user_id: int) -> User:
"""Get user by ID or raise UserNotFoundException."""
user = db.query(User).filter(User.id == user_id).first()
@@ -871,5 +879,40 @@ class AdminService:
db.add_all(roles)
def get_user_statistics(self, db: Session) -> dict:
"""
Get user statistics for dashboards.
Returns:
Dict with total_users, active_users, inactive_users, admin_users, activation_rate
"""
from sqlalchemy import func
total_users = db.query(func.count(User.id)).scalar() or 0
active_users = (
db.query(func.count(User.id))
.filter(User.is_active == True) # noqa: E712
.scalar()
or 0
)
inactive_users = total_users - active_users
admin_users = (
db.query(func.count(User.id))
.filter(User.role.in_(["super_admin", "platform_admin"]))
.scalar()
or 0
)
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": inactive_users,
"admin_users": admin_users,
"activation_rate": (
(active_users / total_users * 100) if total_users > 0 else 0
),
}
# Create service instance
admin_service = AdminService()

View File

@@ -125,6 +125,21 @@ class MerchantService:
return merchant
def get_merchant_by_id_optional(
self, db: Session, merchant_id: int
) -> Merchant | None:
"""
Get merchant by ID, returns None if not found.
Args:
db: Database session
merchant_id: Merchant ID
Returns:
Merchant object or None
"""
return db.query(Merchant).filter(Merchant.id == merchant_id).first()
def get_merchants(
self,
db: Session,

View File

@@ -17,7 +17,6 @@ from dataclasses import dataclass
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.modules.cms.models import ContentPage
from app.modules.tenancy.exceptions import (
PlatformNotFoundException,
)
@@ -102,6 +101,11 @@ class PlatformService:
return platform
@staticmethod
def get_default_platform(db: Session) -> Platform | None:
"""Get the first/default platform."""
return db.query(Platform).first()
@staticmethod
def list_platforms(
db: Session, include_inactive: bool = False
@@ -167,6 +171,13 @@ class PlatformService:
or 0
)
@staticmethod
def _get_content_page_model():
"""Deferred import for CMS ContentPage model."""
from app.modules.cms.models import ContentPage
return ContentPage
@staticmethod
def get_platform_pages_count(db: Session, platform_id: int) -> int:
"""
@@ -179,6 +190,7 @@ class PlatformService:
Returns:
Platform pages count
"""
ContentPage = PlatformService._get_content_page_model()
return (
db.query(func.count(ContentPage.id))
.filter(
@@ -202,6 +214,7 @@ class PlatformService:
Returns:
Store defaults count
"""
ContentPage = PlatformService._get_content_page_model()
return (
db.query(func.count(ContentPage.id))
.filter(
@@ -225,6 +238,7 @@ class PlatformService:
Returns:
Store overrides count
"""
ContentPage = PlatformService._get_content_page_model()
return (
db.query(func.count(ContentPage.id))
.filter(
@@ -247,6 +261,7 @@ class PlatformService:
Returns:
Published pages count
"""
ContentPage = PlatformService._get_content_page_model()
return (
db.query(func.count(ContentPage.id))
.filter(
@@ -269,6 +284,7 @@ class PlatformService:
Returns:
Draft pages count
"""
ContentPage = PlatformService._get_content_page_model()
return (
db.query(func.count(ContentPage.id))
.filter(
@@ -303,6 +319,187 @@ class PlatformService:
draft_pages_count=cls.get_draft_pages_count(db, platform.id),
)
# ========================================================================
# StorePlatform cross-module public API methods
# ========================================================================
@staticmethod
def get_primary_platform_id_for_store(db: Session, store_id: int) -> int | None:
"""
Get the primary platform ID for a store.
Args:
db: Database session
store_id: Store ID
Returns:
Platform ID or None if no platform assigned
"""
result = (
db.query(StorePlatform.platform_id)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.is_active == True, # noqa: E712
)
.order_by(StorePlatform.is_primary.desc())
.first()
)
return result[0] if result else None
@staticmethod
def get_active_platform_ids_for_store(db: Session, store_id: int) -> list[int]:
"""
Get all active platform IDs for a store.
Args:
db: Database session
store_id: Store ID
Returns:
List of platform IDs
"""
results = (
db.query(StorePlatform.platform_id)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.is_active == True, # noqa: E712
)
.order_by(StorePlatform.is_primary.desc())
.all()
)
return [r[0] for r in results]
@staticmethod
def get_store_platform_entry(
db: Session, store_id: int, platform_id: int
) -> StorePlatform | None:
"""
Get a specific StorePlatform entry.
Args:
db: Database session
store_id: Store ID
platform_id: Platform ID
Returns:
StorePlatform object or None
"""
return (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.platform_id == platform_id,
)
.first()
)
@staticmethod
def get_primary_store_platform_entry(
db: Session, store_id: int
) -> StorePlatform | None:
"""
Get the primary StorePlatform entry for a store.
Args:
db: Database session
store_id: Store ID
Returns:
StorePlatform object or None
"""
return (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.is_primary.is_(True),
)
.first()
)
@staticmethod
def get_store_ids_for_platform(
db: Session, platform_id: int, active_only: bool = True
) -> list[int]:
"""
Get store IDs subscribed to a platform.
Args:
db: Database session
platform_id: Platform ID
active_only: Only return active store-platform links
Returns:
List of store IDs
"""
query = db.query(StorePlatform.store_id).filter(
StorePlatform.platform_id == platform_id,
)
if active_only:
query = query.filter(StorePlatform.is_active == True) # noqa: E712
return [r[0] for r in query.all()]
@staticmethod
def ensure_store_platform(
db: Session,
store_id: int,
platform_id: int,
is_active: bool,
tier_id: int | None = None,
) -> StorePlatform | None:
"""
Upsert a StorePlatform entry.
If the entry exists, update is_active (and tier_id if provided).
If missing and is_active=True, create it (set is_primary if store has none).
If missing and is_active=False, no-op.
Args:
db: Database session
store_id: Store ID
platform_id: Platform ID
is_active: Whether the store-platform link is active
tier_id: Optional subscription tier ID
Returns:
The StorePlatform entry, or None if no-op
"""
existing = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.platform_id == platform_id,
)
.first()
)
if existing:
existing.is_active = is_active
if tier_id is not None:
existing.tier_id = tier_id
return existing
if is_active:
has_primary = (
db.query(StorePlatform)
.filter(
StorePlatform.store_id == store_id,
StorePlatform.is_primary.is_(True),
)
.first()
) is not None
sp = StorePlatform(
store_id=store_id,
platform_id=platform_id,
is_active=True,
is_primary=not has_primary,
tier_id=tier_id,
)
db.add(sp)
return sp
return None
@staticmethod
def update_platform(
db: Session, platform: Platform, update_data: dict

View File

@@ -439,10 +439,129 @@ class StoreService:
logger.info(f"Store {store.store_code} set to {status}")
return store, f"Store {store.store_code} is now {status}"
# NOTE: Product catalog operations have been moved to catalog module.
# Use app.modules.catalog.services.product_service instead.
# - add_product_to_catalog -> product_service.create_product
# - get_products -> product_service.get_store_products
# ========================================================================
# Cross-module public API methods
# ========================================================================
def get_stores_by_merchant_id(
self, db: Session, merchant_id: int, active_only: bool = False
) -> list[Store]:
"""
Get all stores for a merchant.
Args:
db: Database session
merchant_id: Merchant ID
active_only: Only return active stores
Returns:
List of Store objects
"""
query = db.query(Store).filter(Store.merchant_id == merchant_id)
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.order_by(Store.id).all()
def get_store_by_code_or_subdomain(
self, db: Session, code: str
) -> Store | None:
"""
Get store by store_code or subdomain.
Args:
db: Database session
code: Store code or subdomain
Returns:
Store object or None
"""
return (
db.query(Store)
.filter(
(func.upper(Store.store_code) == code.upper())
| (func.lower(Store.subdomain) == code.lower())
)
.first()
)
def get_total_store_count(
self, db: Session, active_only: bool = False
) -> int:
"""
Get total count of stores.
Args:
db: Database session
active_only: Only count active stores
Returns:
Store count
"""
query = db.query(func.count(Store.id))
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.scalar() or 0
def get_store_count_by_status(
self,
db: Session,
active: bool | None = None,
verified: bool | None = None,
) -> int:
"""
Count stores filtered by active/verified status.
Args:
db: Database session
active: Filter by active status
verified: Filter by verified status
Returns:
Store count matching filters
"""
query = db.query(func.count(Store.id))
if active is not None:
query = query.filter(Store.is_active == active)
if verified is not None:
query = query.filter(Store.is_verified == verified)
return query.scalar() or 0
def list_all_stores(self, db: Session, active_only: bool = False) -> list[Store]:
"""Get all stores, optionally filtering by active status."""
query = db.query(Store)
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.order_by(Store.id).all()
def is_letzshop_slug_claimed(self, db: Session, letzshop_slug: str) -> bool:
"""Check if a Letzshop store slug is already claimed."""
return (
db.query(Store)
.filter(
Store.letzshop_store_slug == letzshop_slug,
Store.is_active == True, # noqa: E712
)
.first()
is not None
)
def is_store_code_taken(self, db: Session, store_code: str) -> bool:
"""Check if a store code already exists."""
return (
db.query(Store)
.filter(Store.store_code == store_code)
.first()
is not None
)
def is_subdomain_taken(self, db: Session, subdomain: str) -> bool:
"""Check if a subdomain already exists."""
return (
db.query(Store)
.filter(Store.subdomain == subdomain)
.first()
is not None
)
# Private helper methods
def _store_code_exists(self, db: Session, store_code: str) -> bool:

View File

@@ -12,6 +12,7 @@ import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
@@ -188,6 +189,74 @@ class TeamService:
logger.error(f"Error removing team member: {str(e)}")
raise TeamValidationException("Failed to remove team member")
# ========================================================================
# Cross-module public API methods
# ========================================================================
def get_store_owner(self, db: Session, store_id: int) -> StoreUser | None:
"""
Get the owner StoreUser for a store.
Args:
db: Database session
store_id: Store ID
Returns:
StoreUser with is_owner=True, or None
"""
return (
db.query(StoreUser)
.filter(
StoreUser.store_id == store_id,
StoreUser.is_owner == True, # noqa: E712
)
.first()
)
def get_active_team_member_count(self, db: Session, store_id: int) -> int:
"""
Count active team members for a store.
Args:
db: Database session
store_id: Store ID
Returns:
Number of active team members
"""
return (
db.query(func.count(StoreUser.id))
.filter(
StoreUser.store_id == store_id,
StoreUser.is_active == True, # noqa: E712
)
.scalar()
or 0
)
def get_store_users_with_user(
self, db: Session, store_id: int, active_only: bool = True
) -> list[tuple[User, StoreUser]]:
"""
Get User and StoreUser pairs for a store.
Args:
db: Database session
store_id: Store ID
active_only: Only active users
Returns:
List of (User, StoreUser) tuples
"""
query = (
db.query(User, StoreUser)
.join(StoreUser, User.id == StoreUser.user_id)
.filter(StoreUser.store_id == store_id)
)
if active_only:
query = query.filter(User.is_active == True) # noqa: E712
return query.all()
def get_store_roles(self, db: Session, store_id: int) -> list[dict[str, Any]]:
"""
Get available roles for store.
@@ -216,5 +285,20 @@ class TeamService:
raise TeamValidationException("Failed to retrieve roles")
def get_total_active_team_member_count(self, db: Session) -> int:
"""
Count active team members across all stores.
Returns:
Total number of active team members platform-wide
"""
return (
db.query(func.count(StoreUser.id))
.filter(StoreUser.is_active == True) # noqa: E712
.scalar()
or 0
)
# Create service instance
team_service = TeamService()