Refactoring code for modular approach

This commit is contained in:
2025-09-11 21:16:18 +02:00
parent 900229d452
commit f9ed3bdf11
18 changed files with 684 additions and 228 deletions

View File

@@ -1,5 +1,4 @@
from sqlalchemy.orm import Session
from sqlalchemy import func
from fastapi import HTTPException
from datetime import datetime
import logging

View File

@@ -0,0 +1,118 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException
import logging
from typing import Optional, Dict, Any
from models.database_models import User
from models.api_models import UserRegister, UserLogin
from middleware.auth import AuthManager
logger = logging.getLogger(__name__)
class AuthService:
"""Service class for authentication operations following the application's service pattern"""
def __init__(self):
self.auth_manager = AuthManager()
def register_user(self, db: Session, user_data: UserRegister) -> User:
"""
Register a new user
Args:
db: Database session
user_data: User registration data
Returns:
Created user object
Raises:
HTTPException: If email or username already exists
"""
# Check if email already exists
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
raise HTTPException(status_code=400, detail="Email already registered")
# Check if username already exists
existing_username = db.query(User).filter(User.username == user_data.username).first()
if existing_username:
raise HTTPException(status_code=400, detail="Username already taken")
# Hash password and create user
hashed_password = self.auth_manager.hash_password(user_data.password)
new_user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hashed_password,
role="user",
is_active=True
)
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"New user registered: {new_user.username}")
return new_user
def login_user(self, db: Session, user_credentials: UserLogin) -> Dict[str, Any]:
"""
Login user and return JWT token with user data
Args:
db: Database session
user_credentials: User login credentials
Returns:
Dictionary containing access token data and user object
Raises:
HTTPException: If authentication fails
"""
user = self.auth_manager.authenticate_user(db, user_credentials.username, user_credentials.password)
if not user:
raise HTTPException(status_code=401, detail="Incorrect username or password")
# Create access token
token_data = self.auth_manager.create_access_token(user)
logger.info(f"User logged in: {user.username}")
return {
"token_data": token_data,
"user": user
}
def get_user_by_email(self, db: Session, email: str) -> Optional[User]:
"""Get user by email"""
return db.query(User).filter(User.email == email).first()
def get_user_by_username(self, db: Session, username: str) -> Optional[User]:
"""Get user by username"""
return db.query(User).filter(User.username == username).first()
def email_exists(self, db: Session, email: str) -> bool:
"""Check if email already exists"""
return db.query(User).filter(User.email == email).first() is not None
def username_exists(self, db: Session, username: str) -> bool:
"""Check if username already exists"""
return db.query(User).filter(User.username == username).first() is not None
def authenticate_user(self, db: Session, username: str, password: str) -> Optional[User]:
"""Authenticate user with username/password"""
return self.auth_manager.authenticate_user(db, username, password)
def create_access_token(self, user: User) -> Dict[str, Any]:
"""Create access token for user"""
return self.auth_manager.create_access_token(user)
def hash_password(self, password: str) -> str:
"""Hash password"""
return self.auth_manager.hash_password(password)
# Create service instance following the same pattern as admin_service
auth_service = AuthService()

View File

@@ -0,0 +1,248 @@
from sqlalchemy.orm import Session
from fastapi import HTTPException
from datetime import datetime
import logging
from typing import List, Optional, Tuple, Dict, Any
from models.database_models import User, Shop, Product, ShopProduct
from models.api_models import ShopCreate, ShopProductCreate
logger = logging.getLogger(__name__)
class ShopService:
"""Service class for shop operations following the application's service pattern"""
def create_shop(self, db: Session, shop_data: ShopCreate, current_user: User) -> Shop:
"""
Create a new shop
Args:
db: Database session
shop_data: Shop creation data
current_user: User creating the shop
Returns:
Created shop object
Raises:
HTTPException: If shop code already exists
"""
# Check if shop code already exists
existing_shop = db.query(Shop).filter(Shop.shop_code == shop_data.shop_code).first()
if existing_shop:
raise HTTPException(status_code=400, detail="Shop code already exists")
# Create shop
new_shop = Shop(
**shop_data.dict(),
owner_id=current_user.id,
is_active=True,
is_verified=(current_user.role == "admin") # Auto-verify if admin creates shop
)
db.add(new_shop)
db.commit()
db.refresh(new_shop)
logger.info(f"New shop created: {new_shop.shop_code} by {current_user.username}")
return new_shop
def get_shops(
self,
db: Session,
current_user: User,
skip: int = 0,
limit: int = 100,
active_only: bool = True,
verified_only: bool = False
) -> Tuple[List[Shop], int]:
"""
Get shops with filtering
Args:
db: Database session
current_user: Current user requesting shops
skip: Number of records to skip
limit: Maximum number of records to return
active_only: Filter for active shops only
verified_only: Filter for verified shops only
Returns:
Tuple of (shops_list, total_count)
"""
query = db.query(Shop)
# Non-admin users can only see active and verified shops, plus their own
if current_user.role != "admin":
query = query.filter(
(Shop.is_active == True) &
((Shop.is_verified == True) | (Shop.owner_id == current_user.id))
)
else:
# Admin can apply filters
if active_only:
query = query.filter(Shop.is_active == True)
if verified_only:
query = query.filter(Shop.is_verified == True)
total = query.count()
shops = query.offset(skip).limit(limit).all()
return shops, total
def get_shop_by_code(self, db: Session, shop_code: str, current_user: User) -> Shop:
"""
Get shop by shop code with access control
Args:
db: Database session
shop_code: Shop code to find
current_user: Current user requesting the shop
Returns:
Shop object
Raises:
HTTPException: If shop not found or access denied
"""
shop = db.query(Shop).filter(Shop.shop_code == shop_code.upper()).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
# Non-admin users can only see active verified shops or their own shops
if current_user.role != "admin":
if not shop.is_active or (not shop.is_verified and shop.owner_id != current_user.id):
raise HTTPException(status_code=404, detail="Shop not found")
return shop
def add_product_to_shop(
self,
db: Session,
shop: Shop,
shop_product: ShopProductCreate
) -> ShopProduct:
"""
Add existing product to shop catalog with shop-specific settings
Args:
db: Database session
shop: Shop to add product to
shop_product: Shop product data
Returns:
Created ShopProduct object
Raises:
HTTPException: If product not found or already in shop
"""
# Check if product exists
product = db.query(Product).filter(Product.product_id == shop_product.product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found in marketplace catalog")
# Check if product already in shop
existing_shop_product = db.query(ShopProduct).filter(
ShopProduct.shop_id == shop.id,
ShopProduct.product_id == product.id
).first()
if existing_shop_product:
raise HTTPException(status_code=400, detail="Product already in shop catalog")
# Create shop-product association
new_shop_product = ShopProduct(
shop_id=shop.id,
product_id=product.id,
**shop_product.dict(exclude={'product_id'})
)
db.add(new_shop_product)
db.commit()
db.refresh(new_shop_product)
# Load the product relationship
db.refresh(new_shop_product)
logger.info(f"Product {shop_product.product_id} added to shop {shop.shop_code}")
return new_shop_product
def get_shop_products(
self,
db: Session,
shop: Shop,
current_user: User,
skip: int = 0,
limit: int = 100,
active_only: bool = True,
featured_only: bool = False
) -> Tuple[List[ShopProduct], int]:
"""
Get products in shop catalog with filtering
Args:
db: Database session
shop: Shop to get products from
current_user: Current user requesting products
skip: Number of records to skip
limit: Maximum number of records to return
active_only: Filter for active products only
featured_only: Filter for featured products only
Returns:
Tuple of (shop_products_list, total_count)
Raises:
HTTPException: If shop access denied
"""
# Non-owners can only see active verified shops
if current_user.role != "admin" and shop.owner_id != current_user.id:
if not shop.is_active or not shop.is_verified:
raise HTTPException(status_code=404, detail="Shop not found")
# Query shop products
query = db.query(ShopProduct).filter(ShopProduct.shop_id == shop.id)
if active_only:
query = query.filter(ShopProduct.is_active == True)
if featured_only:
query = query.filter(ShopProduct.is_featured == True)
total = query.count()
shop_products = query.offset(skip).limit(limit).all()
return shop_products, total
def get_shop_by_id(self, db: Session, shop_id: int) -> Optional[Shop]:
"""Get shop by ID"""
return db.query(Shop).filter(Shop.id == shop_id).first()
def shop_code_exists(self, db: Session, shop_code: str) -> bool:
"""Check if shop code already exists"""
return db.query(Shop).filter(Shop.shop_code == shop_code).first() is not None
def get_product_by_id(self, db: Session, product_id: str) -> Optional[Product]:
"""Get product by product_id"""
return db.query(Product).filter(Product.product_id == product_id).first()
def product_in_shop(self, db: Session, shop_id: int, product_id: int) -> bool:
"""Check if product is already in shop"""
return db.query(ShopProduct).filter(
ShopProduct.shop_id == shop_id,
ShopProduct.product_id == product_id
).first() is not None
def is_shop_owner(self, shop: Shop, user: User) -> bool:
"""Check if user is shop owner"""
return shop.owner_id == user.id
def can_view_shop(self, shop: Shop, user: User) -> bool:
"""Check if user can view shop"""
if user.role == "admin" or self.is_shop_owner(shop, user):
return True
return shop.is_active and shop.is_verified
# Create service instance following the same pattern as other services
shop_service = ShopService()

View File

@@ -0,0 +1,172 @@
from sqlalchemy import func
from sqlalchemy.orm import Session
import logging
from typing import List, Dict, Any
from models.database_models import User, Product, Stock
from models.api_models import StatsResponse, MarketplaceStatsResponse
logger = logging.getLogger(__name__)
class StatsService:
"""Service class for statistics operations following the application's service pattern"""
def get_comprehensive_stats(self, db: Session) -> Dict[str, Any]:
"""
Get comprehensive statistics with marketplace data
Args:
db: Database session
Returns:
Dictionary containing all statistics data
"""
# Use more efficient queries with proper indexes
total_products = db.query(Product).count()
unique_brands = db.query(Product.brand).filter(
Product.brand.isnot(None),
Product.brand != ""
).distinct().count()
unique_categories = db.query(Product.google_product_category).filter(
Product.google_product_category.isnot(None),
Product.google_product_category != ""
).distinct().count()
# New marketplace statistics
unique_marketplaces = db.query(Product.marketplace).filter(
Product.marketplace.isnot(None),
Product.marketplace != ""
).distinct().count()
unique_shops = db.query(Product.shop_name).filter(
Product.shop_name.isnot(None),
Product.shop_name != ""
).distinct().count()
# Stock statistics
total_stock_entries = db.query(Stock).count()
total_inventory = db.query(func.sum(Stock.quantity)).scalar() or 0
stats_data = {
"total_products": total_products,
"unique_brands": unique_brands,
"unique_categories": unique_categories,
"unique_marketplaces": unique_marketplaces,
"unique_shops": unique_shops,
"total_stock_entries": total_stock_entries,
"total_inventory_quantity": total_inventory
}
logger.info(f"Generated comprehensive stats: {total_products} products, {unique_marketplaces} marketplaces")
return stats_data
def get_marketplace_breakdown_stats(self, db: Session) -> List[Dict[str, Any]]:
"""
Get statistics broken down by marketplace
Args:
db: Database session
Returns:
List of dictionaries containing marketplace statistics
"""
# Query to get stats per marketplace
marketplace_stats = db.query(
Product.marketplace,
func.count(Product.id).label('total_products'),
func.count(func.distinct(Product.shop_name)).label('unique_shops'),
func.count(func.distinct(Product.brand)).label('unique_brands')
).filter(
Product.marketplace.isnot(None)
).group_by(Product.marketplace).all()
stats_list = [
{
"marketplace": stat.marketplace,
"total_products": stat.total_products,
"unique_shops": stat.unique_shops,
"unique_brands": stat.unique_brands
} for stat in marketplace_stats
]
logger.info(f"Generated marketplace breakdown stats for {len(stats_list)} marketplaces")
return stats_list
def get_product_count(self, db: Session) -> int:
"""Get total product count"""
return db.query(Product).count()
def get_unique_brands_count(self, db: Session) -> int:
"""Get count of unique brands"""
return db.query(Product.brand).filter(
Product.brand.isnot(None),
Product.brand != ""
).distinct().count()
def get_unique_categories_count(self, db: Session) -> int:
"""Get count of unique categories"""
return db.query(Product.google_product_category).filter(
Product.google_product_category.isnot(None),
Product.google_product_category != ""
).distinct().count()
def get_unique_marketplaces_count(self, db: Session) -> int:
"""Get count of unique marketplaces"""
return db.query(Product.marketplace).filter(
Product.marketplace.isnot(None),
Product.marketplace != ""
).distinct().count()
def get_unique_shops_count(self, db: Session) -> int:
"""Get count of unique shops"""
return db.query(Product.shop_name).filter(
Product.shop_name.isnot(None),
Product.shop_name != ""
).distinct().count()
def get_stock_statistics(self, db: Session) -> Dict[str, int]:
"""
Get stock-related statistics
Args:
db: Database session
Returns:
Dictionary containing stock statistics
"""
total_stock_entries = db.query(Stock).count()
total_inventory = db.query(func.sum(Stock.quantity)).scalar() or 0
return {
"total_stock_entries": total_stock_entries,
"total_inventory_quantity": total_inventory
}
def get_brands_by_marketplace(self, db: Session, marketplace: str) -> List[str]:
"""Get unique brands for a specific marketplace"""
brands = db.query(Product.brand).filter(
Product.marketplace == marketplace,
Product.brand.isnot(None),
Product.brand != ""
).distinct().all()
return [brand[0] for brand in brands]
def get_shops_by_marketplace(self, db: Session, marketplace: str) -> List[str]:
"""Get unique shops for a specific marketplace"""
shops = db.query(Product.shop_name).filter(
Product.marketplace == marketplace,
Product.shop_name.isnot(None),
Product.shop_name != ""
).distinct().all()
return [shop[0] for shop in shops]
def get_products_by_marketplace(self, db: Session, marketplace: str) -> int:
"""Get product count for a specific marketplace"""
return db.query(Product).filter(Product.marketplace == marketplace).count()
# Create service instance following the same pattern as other services
stats_service = StatsService()