# app/services/auth_service.py """Summary description .... This module provides classes and functions for: - .... - .... - .... """ import logging from typing import Any, Dict, Optional from fastapi import HTTPException from sqlalchemy.orm import Session from middleware.auth import AuthManager from models.api.auth import UserLogin, UserRegister from models.database.user import User logger = logging.getLogger(__name__) class AuthService: """Service class for authentication operations following the application's service pattern.""" def __init__(self): """Class constructor.""" self.auth_manager = AuthManager() def register_user(self, db: Session, user_data: UserRegister) -> User: """ Register a new user. Args: db: Database session user_data: User registration data Returns: Created user object Raises: HTTPException: If email or username already exists """ # Check if email already exists existing_email = db.query(User).filter(User.email == user_data.email).first() if existing_email: raise HTTPException(status_code=400, detail="Email already registered") # Check if username already exists existing_username = ( db.query(User).filter(User.username == user_data.username).first() ) if existing_username: raise HTTPException(status_code=400, detail="Username already taken") # Hash password and create user hashed_password = self.auth_manager.hash_password(user_data.password) new_user = User( email=user_data.email, username=user_data.username, hashed_password=hashed_password, role="user", is_active=True, ) db.add(new_user) db.commit() db.refresh(new_user) logger.info(f"New user registered: {new_user.username}") return new_user def login_user(self, db: Session, user_credentials: UserLogin) -> Dict[str, Any]: """ Login user and return JWT token with user data. Args: db: Database session user_credentials: User login credentials Returns: Dictionary containing access token data and user object Raises: HTTPException: If authentication fails """ user = self.auth_manager.authenticate_user( db, user_credentials.username, user_credentials.password ) if not user: raise HTTPException( status_code=401, detail="Incorrect username or password" ) # Create access token token_data = self.auth_manager.create_access_token(user) logger.info(f"User logged in: {user.username}") return {"token_data": token_data, "user": user} def get_user_by_email(self, db: Session, email: str) -> Optional[User]: """Get user by email.""" return db.query(User).filter(User.email == email).first() def get_user_by_username(self, db: Session, username: str) -> Optional[User]: """Get user by username.""" return db.query(User).filter(User.username == username).first() def email_exists(self, db: Session, email: str) -> bool: """Check if email already exists.""" return db.query(User).filter(User.email == email).first() is not None def username_exists(self, db: Session, username: str) -> bool: """Check if username already exists.""" return db.query(User).filter(User.username == username).first() is not None def authenticate_user( self, db: Session, username: str, password: str ) -> Optional[User]: """Authenticate user with username/password.""" return self.auth_manager.authenticate_user(db, username, password) def create_access_token(self, user: User) -> Dict[str, Any]: """Create access token for user.""" return self.auth_manager.create_access_token(user) def hash_password(self, password: str) -> str: """Hash password.""" return self.auth_manager.hash_password(password) # Create service instance following the same pattern as admin_service auth_service = AuthService()