Files
orion/app/services/auth_service.py
Samir Boulahtit 21c13ca39b style: apply black and isort formatting across entire codebase
- Standardize quote style (single to double quotes)
- Reorder and group imports alphabetically
- Fix line breaks and indentation for consistency
- Apply PEP 8 formatting standards

Also updated Makefile to exclude both venv and .venv from code quality checks.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 19:30:17 +01:00

225 lines
7.8 KiB
Python

# app/services/auth_service.py
"""
Authentication service for user registration and login.
This module provides classes and functions for:
- User registration with validation
- User authentication and JWT token generation
- Password management and security
"""
import logging
from typing import Any, Dict, Optional
from sqlalchemy.orm import Session
from app.exceptions import (InvalidCredentialsException,
UserAlreadyExistsException, UserNotActiveException,
ValidationException)
from middleware.auth import AuthManager
from models.database.user import User
from models.schema.auth import UserLogin, UserRegister
logger = logging.getLogger(__name__)
class AuthService:
"""Service class for authentication operations following the application's service pattern."""
def __init__(self):
"""Class constructor."""
self.auth_manager = AuthManager()
def register_user(self, db: Session, user_data: UserRegister) -> User:
"""
Register a new user.
Args:
db: Database session
user_data: User registration data
Returns:
Created user object
Raises:
UserAlreadyExistsException: If email or username already exists
ValidationException: If user data is invalid
"""
try:
# Check if email already exists
if self._email_exists(db, user_data.email):
raise UserAlreadyExistsException(
"Email already registered", field="email"
)
# Check if username already exists
if self._username_exists(db, user_data.username):
raise UserAlreadyExistsException(
"Username already taken", field="username"
)
# Hash password and create user
hashed_password = self.auth_manager.hash_password(user_data.password)
new_user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hashed_password,
role="user",
is_active=True,
)
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"New user registered: {new_user.username}")
return new_user
except UserAlreadyExistsException:
raise # Re-raise custom exceptions
except Exception as e:
db.rollback()
logger.error(f"Error registering user: {str(e)}")
raise ValidationException("Registration failed")
def login_user(self, db: Session, user_credentials: UserLogin) -> Dict[str, Any]:
"""
Login user and return JWT token with user data.
Args:
db: Database session
user_credentials: User login credentials
Returns:
Dictionary containing access token data and user object
Raises:
InvalidCredentialsException: If authentication fails
UserNotActiveException: If user account is not active
"""
try:
user = self.auth_manager.authenticate_user(
db, user_credentials.email_or_username, user_credentials.password
)
if not user:
raise InvalidCredentialsException("Incorrect username or password")
# Check if user is active
if not user.is_active:
raise UserNotActiveException("User account is not active")
# Create access token
token_data = self.auth_manager.create_access_token(user)
logger.info(f"User logged in: {user.username}")
return {"token_data": token_data, "user": user}
except (InvalidCredentialsException, UserNotActiveException):
raise # Re-raise custom exceptions
except Exception as e:
logger.error(f"Error during login: {str(e)}")
raise InvalidCredentialsException()
def get_user_by_email(self, db: Session, email: str) -> Optional[User]:
"""Get user by email."""
try:
return db.query(User).filter(User.email == email).first()
except Exception as e:
logger.error(f"Error getting user by email: {str(e)}")
return None
def get_user_by_username(self, db: Session, username: str) -> Optional[User]:
"""Get user by username."""
try:
return db.query(User).filter(User.username == username).first()
except Exception as e:
logger.error(f"Error getting user by username: {str(e)}")
return None
def authenticate_user(
self, db: Session, username: str, password: str
) -> Optional[User]:
"""Authenticate user with username/password."""
try:
return self.auth_manager.authenticate_user(db, username, password)
except Exception as e:
logger.error(f"Error authenticating user: {str(e)}")
return None
def create_access_token(self, user: User) -> Dict[str, Any]:
"""Create access token for user."""
try:
return self.auth_manager.create_access_token(user)
except Exception as e:
logger.error(f"Error creating access token: {str(e)}")
raise ValidationException("Failed to create access token")
def hash_password(self, password: str) -> str:
"""Hash password."""
try:
return self.auth_manager.hash_password(password)
except Exception as e:
logger.error(f"Error hashing password: {str(e)}")
raise ValidationException("Failed to hash password")
def verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""Verify password against hash."""
try:
return self.auth_manager.verify_password(plain_password, hashed_password)
except Exception as e:
logger.error(f"Error verifying password: {str(e)}")
return False
def create_access_token_with_data(self, data: dict) -> dict:
"""
Create JWT token with custom data payload.
Useful for non-User entities like customers that need tokens.
Args:
data: Dictionary containing token payload data (must include 'sub')
Returns:
Dictionary with access_token, token_type, and expires_in
"""
from datetime import datetime, timedelta, timezone
from jose import jwt
from app.core.config import settings
try:
expires_delta = timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES)
expire = datetime.now(timezone.utc) + expires_delta
# Build payload with provided data
payload = {
**data,
"exp": expire,
"iat": datetime.now(timezone.utc),
}
token = jwt.encode(payload, settings.SECRET_KEY, algorithm="HS256")
return {
"access_token": token,
"token_type": "bearer",
"expires_in": settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
}
except Exception as e:
logger.error(f"Error creating access token with data: {str(e)}")
raise ValidationException("Failed to create access token")
# Private helper methods
def _email_exists(self, db: Session, email: str) -> bool:
"""Check if email already exists."""
return db.query(User).filter(User.email == email).first() is not None
def _username_exists(self, db: Session, username: str) -> bool:
"""Check if username already exists."""
return db.query(User).filter(User.username == username).first() is not None
# Create service instance following the same pattern as other services
auth_service = AuthService()