# app/services/shop_service.py """Summary description .... This module provides classes and functions for: - .... - .... - .... """ import logging from typing import List, Optional, Tuple from fastapi import HTTPException from sqlalchemy import func from sqlalchemy.orm import Session from models.api.shop import ShopCreate, ShopProductCreate from models.database.product import Product from models.database.shop import Shop, ShopProduct from models.database.user import User logger = logging.getLogger(__name__) class ShopService: """Service class for shop operations following the application's service pattern.""" def create_shop( self, db: Session, shop_data: ShopCreate, current_user: User ) -> Shop: """ Create a new shop. Args: db: Database session shop_data: Shop creation data current_user: User creating the shop Returns: Created shop object Raises: HTTPException: If shop code already exists """ # Normalize shop code to uppercase normalized_shop_code = shop_data.shop_code.upper() # Check if shop code already exists (case-insensitive check against existing data) existing_shop = ( db.query(Shop) .filter(func.upper(Shop.shop_code) == normalized_shop_code) .first() ) if existing_shop: raise HTTPException(status_code=400, detail="Shop code already exists") # Create shop with uppercase code shop_dict = shop_data.model_dump() # Fixed deprecated .dict() method shop_dict["shop_code"] = normalized_shop_code # Store as uppercase new_shop = Shop( **shop_dict, owner_id=current_user.id, is_active=True, is_verified=(current_user.role == "admin"), ) db.add(new_shop) db.commit() db.refresh(new_shop) logger.info( f"New shop created: {new_shop.shop_code} by {current_user.username}" ) return new_shop def get_shops( self, db: Session, current_user: User, skip: int = 0, limit: int = 100, active_only: bool = True, verified_only: bool = False, ) -> Tuple[List[Shop], int]: """ Get shops with filtering. Args: db: Database session current_user: Current user requesting shops skip: Number of records to skip limit: Maximum number of records to return active_only: Filter for active shops only verified_only: Filter for verified shops only Returns: Tuple of (shops_list, total_count) """ query = db.query(Shop) # Non-admin users can only see active and verified shops, plus their own if current_user.role != "admin": query = query.filter( (Shop.is_active == True) & ((Shop.is_verified == True) | (Shop.owner_id == current_user.id)) ) else: # Admin can apply filters if active_only: query = query.filter(Shop.is_active == True) if verified_only: query = query.filter(Shop.is_verified == True) total = query.count() shops = query.offset(skip).limit(limit).all() return shops, total def get_shop_by_code(self, db: Session, shop_code: str, current_user: User) -> Shop: """ Get shop by shop code with access control. Args: db: Database session shop_code: Shop code to find current_user: Current user requesting the shop Returns: Shop object Raises: HTTPException: If shop not found or access denied """ # Explicit type hint to help type checker shop: Optional[Shop] shop: Optional[Shop] = ( db.query(Shop) .filter(func.upper(Shop.shop_code) == shop_code.upper()) .first() ) if not shop: raise HTTPException(status_code=404, detail="Shop not found") # Non-admin users can only see active verified shops or their own shops if current_user.role != "admin": if not shop.is_active or ( not shop.is_verified and shop.owner_id != current_user.id ): raise HTTPException(status_code=404, detail="Shop not found") return shop def add_product_to_shop( self, db: Session, shop: Shop, shop_product: ShopProductCreate ) -> ShopProduct: """ Add existing product to shop catalog with shop-specific settings. Args: db: Database session shop: Shop to add product to shop_product: Shop product data Returns: Created ShopProduct object Raises: HTTPException: If product not found or already in shop """ # Check if product exists product = ( db.query(Product) .filter(Product.product_id == shop_product.product_id) .first() ) if not product: raise HTTPException( status_code=404, detail="Product not found in marketplace catalog" ) # Check if product already in shop existing_shop_product = ( db.query(ShopProduct) .filter( ShopProduct.shop_id == shop.id, ShopProduct.product_id == product.id ) .first() ) if existing_shop_product: raise HTTPException( status_code=400, detail="Product already in shop catalog" ) # Create shop-product association new_shop_product = ShopProduct( shop_id=shop.id, product_id=product.id, **shop_product.model_dump(exclude={"product_id"}), ) db.add(new_shop_product) db.commit() db.refresh(new_shop_product) # Load the product relationship db.refresh(new_shop_product) logger.info(f"Product {shop_product.product_id} added to shop {shop.shop_code}") return new_shop_product def get_shop_products( self, db: Session, shop: Shop, current_user: User, skip: int = 0, limit: int = 100, active_only: bool = True, featured_only: bool = False, ) -> Tuple[List[ShopProduct], int]: """ Get products in shop catalog with filtering. Args: db: Database session shop: Shop to get products from current_user: Current user requesting products skip: Number of records to skip limit: Maximum number of records to return active_only: Filter for active products only featured_only: Filter for featured products only Returns: Tuple of (shop_products_list, total_count) Raises: HTTPException: If shop access denied """ # Non-owners can only see active verified shops if current_user.role != "admin" and shop.owner_id != current_user.id: if not shop.is_active or not shop.is_verified: raise HTTPException(status_code=404, detail="Shop not found") # Query shop products query = db.query(ShopProduct).filter(ShopProduct.shop_id == shop.id) if active_only: query = query.filter(ShopProduct.is_active == True) if featured_only: query = query.filter(ShopProduct.is_featured == True) total = query.count() shop_products = query.offset(skip).limit(limit).all() return shop_products, total def get_shop_by_id(self, db: Session, shop_id: int) -> Optional[Shop]: """Get shop by ID.""" return db.query(Shop).filter(Shop.id == shop_id).first() def shop_code_exists(self, db: Session, shop_code: str) -> bool: """Check if shop code already exists.""" return db.query(Shop).filter(Shop.shop_code == shop_code).first() is not None def get_product_by_id(self, db: Session, product_id: str) -> Optional[Product]: """Get product by product_id.""" return db.query(Product).filter(Product.product_id == product_id).first() def product_in_shop(self, db: Session, shop_id: int, product_id: int) -> bool: """Check if product is already in shop.""" return ( db.query(ShopProduct) .filter( ShopProduct.shop_id == shop_id, ShopProduct.product_id == product_id ) .first() is not None ) def is_shop_owner(self, shop: Shop, user: User) -> bool: """Check if user is shop owner.""" return shop.owner_id == user.id def can_view_shop(self, shop: Shop, user: User) -> bool: """Check if user can view shop.""" if user.role == "admin" or self.is_shop_owner(shop, user): return True return shop.is_active and shop.is_verified # Create service instance following the same pattern as other services shop_service = ShopService()