# app/services/auth_service.py """ Authentication service for user registration and login. This module provides classes and functions for: - User registration with validation - User authentication and JWT token generation - Password management and security """ import logging from datetime import UTC from typing import Any from sqlalchemy.orm import Session from app.exceptions import ( InvalidCredentialsException, UserAlreadyExistsException, UserNotActiveException, ValidationException, ) from middleware.auth import AuthManager from models.database.user import User from models.database.vendor import Vendor, VendorUser from models.schema.auth import UserLogin, UserRegister logger = logging.getLogger(__name__) class AuthService: """Service class for authentication operations following the application's service pattern.""" def __init__(self): """Class constructor.""" self.auth_manager = AuthManager() def register_user(self, db: Session, user_data: UserRegister) -> User: """ Register a new user. Args: db: Database session user_data: User registration data Returns: Created user object Raises: UserAlreadyExistsException: If email or username already exists ValidationException: If user data is invalid """ try: # Check if email already exists if self._email_exists(db, user_data.email): raise UserAlreadyExistsException( "Email already registered", field="email" ) # Check if username already exists if self._username_exists(db, user_data.username): raise UserAlreadyExistsException( "Username already taken", field="username" ) # Hash password and create user hashed_password = self.auth_manager.hash_password(user_data.password) new_user = User( email=user_data.email, username=user_data.username, hashed_password=hashed_password, role="user", is_active=True, ) db.add(new_user) db.commit() db.refresh(new_user) logger.info(f"New user registered: {new_user.username}") return new_user except UserAlreadyExistsException: raise # Re-raise custom exceptions except Exception as e: db.rollback() logger.error(f"Error registering user: {str(e)}") raise ValidationException("Registration failed") def login_user(self, db: Session, user_credentials: UserLogin) -> dict[str, Any]: """ Login user and return JWT token with user data. Args: db: Database session user_credentials: User login credentials Returns: Dictionary containing access token data and user object Raises: InvalidCredentialsException: If authentication fails UserNotActiveException: If user account is not active """ try: user = self.auth_manager.authenticate_user( db, user_credentials.email_or_username, user_credentials.password ) if not user: raise InvalidCredentialsException("Incorrect username or password") # Check if user is active if not user.is_active: raise UserNotActiveException("User account is not active") # Create access token token_data = self.auth_manager.create_access_token(user) logger.info(f"User logged in: {user.username}") return {"token_data": token_data, "user": user} except (InvalidCredentialsException, UserNotActiveException): raise # Re-raise custom exceptions except Exception as e: logger.error(f"Error during login: {str(e)}") raise InvalidCredentialsException() def get_user_by_email(self, db: Session, email: str) -> User | None: """Get user by email.""" try: return db.query(User).filter(User.email == email).first() except Exception as e: logger.error(f"Error getting user by email: {str(e)}") return None def get_user_by_username(self, db: Session, username: str) -> User | None: """Get user by username.""" try: return db.query(User).filter(User.username == username).first() except Exception as e: logger.error(f"Error getting user by username: {str(e)}") return None def authenticate_user( self, db: Session, username: str, password: str ) -> User | None: """Authenticate user with username/password.""" try: return self.auth_manager.authenticate_user(db, username, password) except Exception as e: logger.error(f"Error authenticating user: {str(e)}") return None def create_access_token(self, user: User) -> dict[str, Any]: """Create access token for user.""" try: return self.auth_manager.create_access_token(user) except Exception as e: logger.error(f"Error creating access token: {str(e)}") raise ValidationException("Failed to create access token") def hash_password(self, password: str) -> str: """Hash password.""" try: return self.auth_manager.hash_password(password) except Exception as e: logger.error(f"Error hashing password: {str(e)}") raise ValidationException("Failed to hash password") def verify_password(self, plain_password: str, hashed_password: str) -> bool: """Verify password against hash.""" try: return self.auth_manager.verify_password(plain_password, hashed_password) except Exception as e: logger.error(f"Error verifying password: {str(e)}") return False def create_access_token_with_data(self, data: dict) -> dict: """ Create JWT token with custom data payload. Useful for non-User entities like customers that need tokens. Args: data: Dictionary containing token payload data (must include 'sub') Returns: Dictionary with access_token, token_type, and expires_in """ from datetime import datetime, timedelta from jose import jwt from app.core.config import settings try: expires_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES) expire = datetime.now(UTC) + expires_delta # Build payload with provided data payload = { **data, "exp": expire, "iat": datetime.now(UTC), } token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256") return { "access_token": token, "token_type": "bearer", "expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60, } except Exception as e: logger.error(f"Error creating access token with data: {str(e)}") raise ValidationException("Failed to create access token") def get_vendor_by_code(self, db: Session, vendor_code: str) -> Vendor | None: """ Get active vendor by vendor code. Args: db: Database session vendor_code: Vendor code to look up Returns: Vendor if found and active, None otherwise """ return ( db.query(Vendor) .filter(Vendor.vendor_code == vendor_code.upper(), Vendor.is_active == True) .first() ) def get_user_vendor_role( self, db: Session, user: User, vendor: Vendor ) -> tuple[bool, str | None]: """ Check if user has access to vendor and return their role. Args: db: Database session user: User to check vendor: Vendor to check access for Returns: Tuple of (has_access: bool, role_name: str | None) """ # Check if user is vendor owner (via company ownership) if vendor.company and vendor.company.owner_user_id == user.id: return True, "Owner" # Check if user is team member vendor_user = ( db.query(VendorUser) .filter( VendorUser.user_id == user.id, VendorUser.vendor_id == vendor.id, VendorUser.is_active == True, ) .first() ) if vendor_user: return True, vendor_user.role.name return False, None def find_user_vendor(self, user: User) -> tuple[Vendor | None, str | None]: """ Find which vendor a user belongs to when no vendor context is provided. Checks owned companies first, then vendor memberships. Args: user: User to find vendor for Returns: Tuple of (vendor: Vendor | None, role: str | None) """ # Check owned vendors first (via company ownership) for company in user.owned_companies: if company.vendors: return company.vendors[0], "Owner" # Check vendor memberships if user.vendor_memberships: active_membership = next( (vm for vm in user.vendor_memberships if vm.is_active), None ) if active_membership: return active_membership.vendor, active_membership.role.name return None, None # Private helper methods def _email_exists(self, db: Session, email: str) -> bool: """Check if email already exists.""" return db.query(User).filter(User.email == email).first() is not None def _username_exists(self, db: Session, username: str) -> bool: """Check if username already exists.""" return db.query(User).filter(User.username == username).first() is not None # Create service instance following the same pattern as other services auth_service = AuthService()