Files
orion/app/modules/tenancy/services/store_service.py
Samir Boulahtit 86e85a98b8
Some checks failed
CI / ruff (push) Successful in 9s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
refactor(arch): eliminate all cross-module model imports in service layer
Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports
remain in any service file. All 66 files migrated using deferred import
patterns (method-body, _get_model() helpers, instance-cached self._Model)
and new cross-module service methods in tenancy. Documentation updated
with Pattern 6 (deferred imports), migration plan marked complete, and
violations status reflects 84→0 service-layer violations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 06:13:15 +01:00

694 lines
22 KiB
Python

# app/modules/tenancy/services/store_service.py
"""
Store service for managing store operations.
This module provides classes and functions for:
- Store creation and management
- Store access control and validation
- Store filtering and search
Note: Product catalog operations have been moved to app.modules.catalog.services.
"""
import logging
from sqlalchemy import func
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.orm import Session
from app.modules.tenancy.exceptions import (
InvalidStoreDataException,
StoreAlreadyExistsException,
StoreNotFoundException,
StoreValidationException,
UnauthorizedStoreAccessException,
)
from app.modules.tenancy.models import Store, User
from app.modules.tenancy.schemas.store import StoreCreate
logger = logging.getLogger(__name__)
class StoreService:
"""Service class for store operations following the application's service pattern."""
def create_store(
self, db: Session, store_data: StoreCreate, current_user: User
) -> Store:
"""
Create a new store under a merchant.
DEPRECATED: This method is for self-service store creation by merchant owners.
For admin operations, use admin_service.create_store() instead.
The new architecture:
- Merchants are the business entities with owners and contact info
- Stores are storefronts/brands under merchants
- The merchant_id is required in store_data
Args:
db: Database session
store_data: Store creation data (must include merchant_id)
current_user: User creating the store (must be merchant owner or admin)
Returns:
Created store object
Raises:
StoreAlreadyExistsException: If store code already exists
UnauthorizedStoreAccessException: If user is not merchant owner
InvalidStoreDataException: If store data is invalid
"""
from app.modules.tenancy.models import Merchant
try:
# Validate merchant_id is provided
if not hasattr(store_data, "merchant_id") or not store_data.merchant_id:
raise InvalidStoreDataException(
"merchant_id is required to create a store", field="merchant_id"
)
# Get merchant and verify ownership
merchant = (
db.query(Merchant).filter(Merchant.id == store_data.merchant_id).first()
)
if not merchant:
raise InvalidStoreDataException(
f"Merchant with ID {store_data.merchant_id} not found",
field="merchant_id",
)
# Check if user is merchant owner or admin
if (
not current_user.is_admin
and merchant.owner_user_id != current_user.id
):
raise UnauthorizedStoreAccessException(
f"merchant-{store_data.merchant_id}", current_user.id
)
# Normalize store code to uppercase
normalized_store_code = store_data.store_code.upper()
# Check if store code already exists (case-insensitive check)
if self._store_code_exists(db, normalized_store_code):
raise StoreAlreadyExistsException(normalized_store_code)
# Create store linked to merchant
new_store = Store(
merchant_id=merchant.id,
store_code=normalized_store_code,
subdomain=store_data.subdomain.lower(),
name=store_data.name,
description=store_data.description,
letzshop_csv_url_fr=store_data.letzshop_csv_url_fr,
letzshop_csv_url_en=store_data.letzshop_csv_url_en,
letzshop_csv_url_de=store_data.letzshop_csv_url_de,
is_active=True,
is_verified=current_user.is_admin,
)
db.add(new_store)
db.flush() # Get ID without committing - endpoint handles commit
logger.info(
f"New store created: {new_store.store_code} under merchant {merchant.name} by {current_user.username}"
)
return new_store
except (
StoreAlreadyExistsException,
UnauthorizedStoreAccessException,
InvalidStoreDataException,
):
raise # Re-raise custom exceptions - endpoint handles rollback
except SQLAlchemyError as e:
logger.error(f"Error creating store: {str(e)}")
raise StoreValidationException("Failed to create store")
def get_stores(
self,
db: Session,
current_user: User,
skip: int = 0,
limit: int = 100,
active_only: bool = True,
verified_only: bool = False,
) -> tuple[list[Store], int]:
"""
Get stores with filtering.
Args:
db: Database session
current_user: Current user requesting stores
skip: Number of records to skip
limit: Maximum number of records to return
active_only: Filter for active stores only
verified_only: Filter for verified stores only
Returns:
Tuple of (stores_list, total_count)
"""
try:
query = db.query(Store)
# Non-admin users can only see active and verified stores, plus their own
if not current_user.is_admin:
# Get store IDs the user owns through merchants
from app.modules.tenancy.models import Merchant
owned_store_ids = (
db.query(Store.id)
.join(Merchant)
.filter(Merchant.owner_user_id == current_user.id)
.subquery()
)
query = query.filter(
(Store.is_active == True)
& ((Store.is_verified == True) | (Store.id.in_(owned_store_ids)))
)
else:
# Admin can apply filters
if active_only:
query = query.filter(Store.is_active == True)
if verified_only:
query = query.filter(Store.is_verified == True)
total = query.count()
stores = query.offset(skip).limit(limit).all()
return stores, total
except SQLAlchemyError as e:
logger.error(f"Error getting stores: {str(e)}")
raise StoreValidationException("Failed to retrieve stores")
def get_store_by_code(
self, db: Session, store_code: str, current_user: User
) -> Store:
"""
Get store by store code with access control.
Args:
db: Database session
store_code: Store code to find
current_user: Current user requesting the store
Returns:
Store object
Raises:
StoreNotFoundException: If store not found
UnauthorizedStoreAccessException: If access denied
"""
try:
store = (
db.query(Store)
.filter(func.upper(Store.store_code) == store_code.upper())
.first()
)
if not store:
raise StoreNotFoundException(store_code)
# Check access permissions
if not self._can_access_store(store, current_user):
raise UnauthorizedStoreAccessException(store_code, current_user.id)
return store
except (StoreNotFoundException, UnauthorizedStoreAccessException):
raise # Re-raise custom exceptions
except SQLAlchemyError as e:
logger.error(f"Error getting store {store_code}: {str(e)}")
raise StoreValidationException("Failed to retrieve store")
def get_store_by_id(self, db: Session, store_id: int) -> Store:
"""
Get store by ID (admin use - no access control).
Args:
db: Database session
store_id: Store ID to find
Returns:
Store object with merchant and owner loaded
Raises:
StoreNotFoundException: If store not found
"""
from sqlalchemy.orm import joinedload
from app.modules.tenancy.models import Merchant
store = (
db.query(Store)
.options(joinedload(Store.merchant).joinedload(Merchant.owner))
.filter(Store.id == store_id)
.first()
)
if not store:
raise StoreNotFoundException(str(store_id), identifier_type="id")
return store
def get_store_by_id_optional(self, db: Session, store_id: int) -> Store | None:
"""
Get store by ID, returns None if not found.
Args:
db: Database session
store_id: Store ID to find
Returns:
Store object or None if not found
"""
return db.query(Store).filter(Store.id == store_id).first()
def get_active_store_by_code(self, db: Session, store_code: str) -> Store:
"""
Get active store by store_code for public access (no auth required).
This method is specifically designed for public endpoints where:
- No authentication is required
- Only active stores should be returned
- Inactive/disabled stores are hidden
Args:
db: Database session
store_code: Store code (case-insensitive)
Returns:
Store object with merchant and owner loaded
Raises:
StoreNotFoundException: If store not found or inactive
"""
from sqlalchemy.orm import joinedload
from app.modules.tenancy.models import Merchant
store = (
db.query(Store)
.options(joinedload(Store.merchant).joinedload(Merchant.owner))
.filter(
func.upper(Store.store_code) == store_code.upper(),
Store.is_active == True,
)
.first()
)
if not store:
logger.warning(f"Store not found or inactive: {store_code}")
raise StoreNotFoundException(store_code, identifier_type="code")
return store
def get_store_by_identifier(self, db: Session, identifier: str) -> Store:
"""
Get store by ID or store_code (admin use - no access control).
Args:
db: Database session
identifier: Either store ID (int as string) or store_code (string)
Returns:
Store object with merchant and owner loaded
Raises:
StoreNotFoundException: If store not found
"""
from sqlalchemy.orm import joinedload
from app.modules.tenancy.models import Merchant
# Try as integer ID first
try:
store_id = int(identifier)
return self.get_store_by_id(db, store_id)
except (ValueError, TypeError):
pass # Not an integer, treat as store_code
except StoreNotFoundException:
pass # ID not found, try as store_code
# Try as store_code (case-insensitive)
store = (
db.query(Store)
.options(joinedload(Store.merchant).joinedload(Merchant.owner))
.filter(func.upper(Store.store_code) == identifier.upper())
.first()
)
if not store:
raise StoreNotFoundException(identifier, identifier_type="code")
return store
def toggle_verification(self, db: Session, store_id: int) -> tuple[Store, str]:
"""
Toggle store verification status.
Args:
db: Database session
store_id: Store ID
Returns:
Tuple of (updated store, status message)
Raises:
StoreNotFoundException: If store not found
"""
store = self.get_store_by_id(db, store_id)
store.is_verified = not store.is_verified
# No commit here - endpoint handles transaction
status = "verified" if store.is_verified else "unverified"
logger.info(f"Store {store.store_code} {status}")
return store, f"Store {store.store_code} is now {status}"
def set_verification(
self, db: Session, store_id: int, is_verified: bool
) -> tuple[Store, str]:
"""
Set store verification status to specific value.
Args:
db: Database session
store_id: Store ID
is_verified: Target verification status
Returns:
Tuple of (updated store, status message)
Raises:
StoreNotFoundException: If store not found
"""
store = self.get_store_by_id(db, store_id)
store.is_verified = is_verified
# No commit here - endpoint handles transaction
status = "verified" if is_verified else "unverified"
logger.info(f"Store {store.store_code} set to {status}")
return store, f"Store {store.store_code} is now {status}"
def toggle_status(self, db: Session, store_id: int) -> tuple[Store, str]:
"""
Toggle store active status.
Args:
db: Database session
store_id: Store ID
Returns:
Tuple of (updated store, status message)
Raises:
StoreNotFoundException: If store not found
"""
store = self.get_store_by_id(db, store_id)
store.is_active = not store.is_active
# No commit here - endpoint handles transaction
status = "active" if store.is_active else "inactive"
logger.info(f"Store {store.store_code} {status}")
return store, f"Store {store.store_code} is now {status}"
def set_status(
self, db: Session, store_id: int, is_active: bool
) -> tuple[Store, str]:
"""
Set store active status to specific value.
Args:
db: Database session
store_id: Store ID
is_active: Target active status
Returns:
Tuple of (updated store, status message)
Raises:
StoreNotFoundException: If store not found
"""
store = self.get_store_by_id(db, store_id)
store.is_active = is_active
# No commit here - endpoint handles transaction
status = "active" if is_active else "inactive"
logger.info(f"Store {store.store_code} set to {status}")
return store, f"Store {store.store_code} is now {status}"
# ========================================================================
# Cross-module public API methods
# ========================================================================
def get_stores_by_merchant_id(
self, db: Session, merchant_id: int, active_only: bool = False
) -> list[Store]:
"""
Get all stores for a merchant.
Args:
db: Database session
merchant_id: Merchant ID
active_only: Only return active stores
Returns:
List of Store objects
"""
query = db.query(Store).filter(Store.merchant_id == merchant_id)
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.order_by(Store.id).all()
def get_store_by_code_or_subdomain(
self, db: Session, code: str
) -> Store | None:
"""
Get store by store_code or subdomain.
Args:
db: Database session
code: Store code or subdomain
Returns:
Store object or None
"""
return (
db.query(Store)
.filter(
(func.upper(Store.store_code) == code.upper())
| (func.lower(Store.subdomain) == code.lower())
)
.first()
)
def get_total_store_count(
self, db: Session, active_only: bool = False
) -> int:
"""
Get total count of stores.
Args:
db: Database session
active_only: Only count active stores
Returns:
Store count
"""
query = db.query(func.count(Store.id))
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.scalar() or 0
def get_store_count_by_status(
self,
db: Session,
active: bool | None = None,
verified: bool | None = None,
) -> int:
"""
Count stores filtered by active/verified status.
Args:
db: Database session
active: Filter by active status
verified: Filter by verified status
Returns:
Store count matching filters
"""
query = db.query(func.count(Store.id))
if active is not None:
query = query.filter(Store.is_active == active)
if verified is not None:
query = query.filter(Store.is_verified == verified)
return query.scalar() or 0
def list_all_stores(self, db: Session, active_only: bool = False) -> list[Store]:
"""Get all stores, optionally filtering by active status."""
query = db.query(Store)
if active_only:
query = query.filter(Store.is_active == True) # noqa: E712
return query.order_by(Store.id).all()
def is_letzshop_slug_claimed(self, db: Session, letzshop_slug: str) -> bool:
"""Check if a Letzshop store slug is already claimed."""
return (
db.query(Store)
.filter(
Store.letzshop_store_slug == letzshop_slug,
Store.is_active == True, # noqa: E712
)
.first()
is not None
)
def is_store_code_taken(self, db: Session, store_code: str) -> bool:
"""Check if a store code already exists."""
return (
db.query(Store)
.filter(Store.store_code == store_code)
.first()
is not None
)
def is_subdomain_taken(self, db: Session, subdomain: str) -> bool:
"""Check if a subdomain already exists."""
return (
db.query(Store)
.filter(Store.subdomain == subdomain)
.first()
is not None
)
# Private helper methods
def _store_code_exists(self, db: Session, store_code: str) -> bool:
"""Check if store code already exists (case-insensitive)."""
return (
db.query(Store)
.filter(func.upper(Store.store_code) == store_code.upper())
.first()
is not None
)
def _can_access_store(self, store: Store, user: User) -> bool:
"""Check if user can access store."""
# Admins can always access
if user.is_admin:
return True
# Merchant owners can access their stores
if store.merchant and store.merchant.owner_user_id == user.id:
return True
# Others can only access active and verified stores
return store.is_active and store.is_verified
def _is_store_owner(self, store: Store, user: User) -> bool:
"""Check if user is store owner (via merchant ownership)."""
return store.merchant and store.merchant.owner_user_id == user.id
def can_update_store(self, store: Store, user: User) -> bool:
"""
Check if user has permission to update store settings.
Permission granted to:
- Admins (always)
- Store owners (merchant owner)
- Team members with appropriate role (owner role in StoreUser)
"""
# Admins can always update
if user.is_admin:
return True
# Check if user is store owner via merchant
if self._is_store_owner(store, user):
return True
# Check if user is owner via StoreUser relationship
return bool(user.is_owner_of(store.id))
def update_store(
self,
db: Session,
store_id: int,
store_update,
current_user: User,
) -> "Store":
"""
Update store profile with permission checking.
Raises:
StoreNotFoundException: If store not found
InsufficientPermissionsException: If user lacks permission
"""
from app.modules.tenancy.exceptions import InsufficientPermissionsException
store = self.get_store_by_id(db, store_id)
# Check permissions in service layer
if not self.can_update_store(store, current_user):
raise InsufficientPermissionsException(
required_permission="store:profile:update"
)
# Apply updates
update_data = store_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
if hasattr(store, field):
setattr(store, field, value)
db.add(store)
db.flush()
db.refresh(store)
return store
def update_marketplace_settings(
self,
db: Session,
store_id: int,
marketplace_config: dict,
current_user: User,
) -> dict:
"""
Update marketplace integration settings with permission checking.
Raises:
StoreNotFoundException: If store not found
InsufficientPermissionsException: If user lacks permission
"""
from app.modules.tenancy.exceptions import InsufficientPermissionsException
store = self.get_store_by_id(db, store_id)
# Check permissions in service layer
if not self.can_update_store(store, current_user):
raise InsufficientPermissionsException(
required_permission="store:settings:update"
)
# Update Letzshop URLs
if "letzshop_csv_url_fr" in marketplace_config:
store.letzshop_csv_url_fr = marketplace_config["letzshop_csv_url_fr"]
if "letzshop_csv_url_en" in marketplace_config:
store.letzshop_csv_url_en = marketplace_config["letzshop_csv_url_en"]
if "letzshop_csv_url_de" in marketplace_config:
store.letzshop_csv_url_de = marketplace_config["letzshop_csv_url_de"]
db.add(store)
db.flush()
db.refresh(store)
return {
"message": "Marketplace settings updated successfully",
"letzshop_csv_url_fr": store.letzshop_csv_url_fr,
"letzshop_csv_url_en": store.letzshop_csv_url_en,
"letzshop_csv_url_de": store.letzshop_csv_url_de,
}
# Create service instance following the same pattern as other services
store_service = StoreService()