# app/modules/tenancy/services/merchant_store_service.py """ Merchant store service for store CRUD operations from the merchant portal. Handles store management operations that merchant owners can perform: - View store details (with ownership validation) - Update store settings (name, description, contact info) - Create new stores (with subscription limit checking) Follows the service layer pattern — all DB operations go through here. """ import logging from sqlalchemy import func from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from app.modules.tenancy.exceptions import ( MerchantNotFoundException, StoreAlreadyExistsException, StoreNotFoundException, StoreValidationException, ) from app.modules.tenancy.models.merchant import Merchant from app.modules.tenancy.models.platform import Platform from app.modules.tenancy.models.store import Role, Store from app.modules.tenancy.models.store_platform import StorePlatform from app.modules.tenancy.models.user import User logger = logging.getLogger(__name__) class MerchantStoreService: """Service for merchant-initiated store operations.""" def get_store_detail( self, db: Session, merchant_id: int, store_id: int, ) -> dict: """ Get store detail with ownership validation. Args: db: Database session merchant_id: Merchant ID (for ownership check) store_id: Store ID Returns: Dict with store details and platform assignments Raises: StoreNotFoundException: If store not found or not owned by merchant """ store = ( db.query(Store) .filter(Store.id == store_id, Store.merchant_id == merchant_id) .first() ) if not store: raise StoreNotFoundException(store_id, identifier_type="id") # Get platform assignments store_platforms = ( db.query(StorePlatform) .join(Platform, StorePlatform.platform_id == Platform.id) .filter(StorePlatform.store_id == store.id) .all() ) platforms = [] for sp in store_platforms: platform = db.query(Platform).filter(Platform.id == sp.platform_id).first() if platform: platforms.append( { "id": platform.id, "code": platform.code, "name": platform.name, "is_active": sp.is_active, } ) return { "id": store.id, "store_code": store.store_code, "subdomain": store.subdomain, "name": store.name, "description": store.description, "is_active": store.is_active, "is_verified": store.is_verified, "contact_email": store.contact_email, "contact_phone": store.contact_phone, "website": store.website, "business_address": store.business_address, "tax_number": store.tax_number, "default_language": store.default_language, "created_at": store.created_at.isoformat() if store.created_at else None, "platforms": platforms, } def update_store( self, db: Session, merchant_id: int, store_id: int, update_data: dict, ) -> dict: """ Update store fields (merchant-allowed fields only). Args: db: Database session merchant_id: Merchant ID (for ownership check) store_id: Store ID update_data: Dict of fields to update Returns: Updated store detail dict Raises: StoreNotFoundException: If store not found or not owned by merchant """ store = ( db.query(Store) .filter(Store.id == store_id, Store.merchant_id == merchant_id) .first() ) if not store: raise StoreNotFoundException(store_id, identifier_type="id") # Merchant-allowed update fields allowed_fields = { "name", "description", "contact_email", "contact_phone", "website", "business_address", "tax_number", } for field, value in update_data.items(): if field in allowed_fields: setattr(store, field, value) db.flush() logger.info( f"Merchant {merchant_id} updated store {store.store_code}: " f"{list(update_data.keys())}" ) return self.get_store_detail(db, merchant_id, store_id) def create_store( self, db: Session, merchant_id: int, store_data: dict, ) -> dict: """ Create a new store under the merchant. Args: db: Database session merchant_id: Merchant ID store_data: Store creation data (name, store_code, subdomain, description, platform_ids) Returns: Created store detail dict Raises: MaxStoresReachedException: If store limit reached MerchantNotFoundException: If merchant not found StoreAlreadyExistsException: If store code already exists StoreValidationException: If subdomain taken or validation fails """ # Check store creation limits can_create, message = self.can_create_store(db, merchant_id) if not can_create: from app.modules.tenancy.exceptions import MaxStoresReachedException raise MaxStoresReachedException(max_stores=0) # Validate merchant exists merchant = db.query(Merchant).filter(Merchant.id == merchant_id).first() if not merchant: raise MerchantNotFoundException(merchant_id, identifier_type="id") store_code = store_data["store_code"].upper() subdomain = store_data["subdomain"].lower() # Check store code uniqueness existing = ( db.query(Store) .filter(func.upper(Store.store_code) == store_code) .first() ) if existing: raise StoreAlreadyExistsException(store_code) # Check subdomain uniqueness existing_sub = ( db.query(Store) .filter(func.lower(Store.subdomain) == subdomain) .first() ) if existing_sub: raise StoreValidationException( f"Subdomain '{subdomain}' is already taken", field="subdomain", ) try: # Create store store = Store( merchant_id=merchant_id, store_code=store_code, subdomain=subdomain, name=store_data["name"], description=store_data.get("description"), is_active=True, is_verified=False, # Pending admin verification ) db.add(store) db.flush() # Create default roles self._create_default_roles(db, store.id) # Assign to platforms if provided platform_ids = store_data.get("platform_ids", []) store_platforms = [] for pid in platform_ids: platform = db.query(Platform).filter(Platform.id == pid).first() if platform: store_platforms.append(StorePlatform( store_id=store.id, platform_id=pid, is_active=True, )) if store_platforms: db.add_all(store_platforms) db.flush() db.refresh(store) logger.info( f"Merchant {merchant_id} created store {store.store_code} " f"(ID: {store.id}, platforms: {platform_ids})" ) return self.get_store_detail(db, merchant_id, store.id) except ( StoreAlreadyExistsException, MerchantNotFoundException, StoreValidationException, ): raise except SQLAlchemyError as e: logger.error(f"Failed to create store for merchant {merchant_id}: {e}") raise StoreValidationException( f"Failed to create store: {e}", field="store", ) def can_create_store( self, db: Session, merchant_id: int, ) -> tuple[bool, str | None]: """ Check if merchant can create a new store based on subscription limits. Returns: Tuple of (allowed, message). message explains why if not allowed. """ try: from app.modules.billing.services.feature_service import feature_service return feature_service.check_resource_limit( db, feature_code="stores_limit", merchant_id=merchant_id, ) except Exception: # noqa: EXC-003 # If billing module not available, allow creation return True, None def get_subscribed_platform_ids( self, db: Session, merchant_id: int, ) -> list[dict]: """ Get platforms the merchant has active subscriptions on. Returns: List of platform dicts with id, code, name """ try: from app.modules.billing.services.subscription_service import ( subscription_service, ) platform_ids = subscription_service.get_active_subscription_platform_ids( db, merchant_id ) except Exception: # noqa: EXC-003 platform_ids = [] platforms = [] for pid in platform_ids: platform = db.query(Platform).filter(Platform.id == pid).first() if platform: platforms.append( { "id": platform.id, "code": platform.code, "name": platform.name, } ) return platforms def _create_default_roles(self, db: Session, store_id: int): """Create default roles for a new store.""" default_roles = [ {"name": "Owner", "permissions": ["*"]}, { "name": "Manager", "permissions": [ "products.*", "orders.*", "customers.view", "inventory.*", "team.view", ], }, { "name": "Editor", "permissions": [ "products.view", "products.edit", "orders.view", "inventory.view", ], }, { "name": "Viewer", "permissions": [ "products.view", "orders.view", "customers.view", "inventory.view", ], }, ] roles = [ Role( store_id=store_id, name=role_data["name"], permissions=role_data["permissions"], ) for role_data in default_roles ] db.add_all(roles) def get_merchant_team_overview(self, db: Session, merchant_id: int) -> dict: """ Get team members across all stores owned by the merchant. Returns a list of stores with their team members grouped by store. """ from app.modules.tenancy.models.store import StoreUser merchant = db.query(Merchant).filter(Merchant.id == merchant_id).first() if not merchant: raise MerchantNotFoundException(merchant_id) stores = ( db.query(Store) .filter(Store.merchant_id == merchant_id) .order_by(Store.name) .all() ) result = [] for store in stores: members = ( db.query(StoreUser) .filter(StoreUser.store_id == store.id) .all() ) store_team = { "store_id": store.id, "store_name": store.name, "store_code": store.store_code, "is_active": store.is_active, "members": [ { "id": m.id, "user_id": m.user_id, "email": m.user.email if m.user else None, "first_name": m.user.first_name if m.user else None, "last_name": m.user.last_name if m.user else None, "role_name": m.role.name if m.role else None, "is_active": m.is_active, "invitation_accepted_at": ( m.invitation_accepted_at.isoformat() if m.invitation_accepted_at else None ), "created_at": m.created_at.isoformat() if m.created_at else None, } for m in members ], "member_count": len(members), } result.append(store_team) return { "merchant_name": merchant.name, "owner_email": merchant.owner.email if merchant.owner else None, "stores": result, "total_members": sum(s["member_count"] for s in result), } def get_user(self, db: Session, user_id: int): """Get a User ORM object by ID.""" from app.modules.tenancy.models import User return db.query(User).filter(User.id == user_id).first() def validate_store_ownership( self, db: Session, merchant_id: int, store_id: int ) -> Store: """ Validate that a store belongs to the merchant. Returns the Store object if valid, raises exception otherwise. """ store = ( db.query(Store) .filter(Store.id == store_id, Store.merchant_id == merchant_id) .first() ) if not store: from app.modules.tenancy.exceptions import StoreNotFoundException raise StoreNotFoundException(store_id, identifier_type="id") return store def update_team_member_profile( self, db: Session, merchant_id: int, user_id: int, update_data: dict, ) -> None: """ Update a team member's profile (first_name, last_name, email). Validates that the user is a team member of one of the merchant's stores. """ from app.modules.tenancy.models.store import StoreUser # Verify user is a team member in at least one of the merchant's stores stores = ( db.query(Store) .filter(Store.merchant_id == merchant_id) .all() ) store_ids = [s.id for s in stores] membership = ( db.query(StoreUser) .filter( StoreUser.store_id.in_(store_ids), StoreUser.user_id == user_id, ) .first() ) if not membership: # Also allow updating the owner merchant = db.query(Merchant).filter(Merchant.id == merchant_id).first() if not merchant or merchant.owner_user_id != user_id: from app.modules.tenancy.exceptions import UserNotFoundException raise UserNotFoundException(str(user_id)) user = db.query(User).filter(User.id == user_id).first() if not user: from app.modules.tenancy.exceptions import UserNotFoundException raise UserNotFoundException(str(user_id)) if "first_name" in update_data: user.first_name = update_data["first_name"] if "last_name" in update_data: user.last_name = update_data["last_name"] if "email" in update_data and update_data["email"]: user.email = update_data["email"] db.flush() def get_merchant_team_members(self, db: Session, merchant_id: int) -> dict: """ Get team members across all merchant stores in a member-centric view. Deduplicates users across stores and aggregates per-store role info. """ from app.modules.tenancy.models.store import StoreUser merchant = db.query(Merchant).filter(Merchant.id == merchant_id).first() if not merchant: raise MerchantNotFoundException(merchant_id) stores = ( db.query(Store) .filter(Store.merchant_id == merchant_id) .order_by(Store.name) .all() ) # Build member-centric view: keyed by user_id members_map: dict[int, dict] = {} store_list = [] for store in stores: store_list.append({ "id": store.id, "name": store.name, "code": store.store_code, }) store_users = ( db.query(StoreUser) .filter(StoreUser.store_id == store.id) .all() ) for su in store_users: user = su.user if not user: continue uid = user.id is_pending = su.invitation_accepted_at is None and su.invitation_token is not None if uid not in members_map: members_map[uid] = { "user_id": uid, "email": user.email, "username": user.username, "first_name": user.first_name, "last_name": user.last_name, "full_name": f"{user.first_name or ''} {user.last_name or ''}".strip() or user.email, "role": user.role, "is_active": user.is_active, "is_email_verified": user.is_email_verified, "last_login": user.last_login.isoformat() if user.last_login else None, "created_at": user.created_at.isoformat() if user.created_at else None, "stores": [], "is_owner": uid == merchant.owner_user_id, } members_map[uid]["stores"].append({ "store_id": store.id, "store_name": store.name, "store_code": store.store_code, "role_name": su.role.name if su.role else None, "role_id": su.role_id, "is_active": su.is_active, "is_pending": is_pending, }) members = list(members_map.values()) # Owner first, then alphabetical members.sort(key=lambda m: (not m["is_owner"], m["full_name"].lower())) total_active = sum( 1 for m in members if any(s["is_active"] and not s["is_pending"] for s in m["stores"]) ) total_pending = sum( 1 for m in members if any(s["is_pending"] for s in m["stores"]) ) return { "merchant_name": merchant.name, "stores": store_list, "members": members, "total_members": len(members), "total_active": total_active, "total_pending": total_pending, } # Singleton instance merchant_store_service = MerchantStoreService() __all__ = ["MerchantStoreService", "merchant_store_service"]