# app/modules/monitoring/services/log_service.py """ Log management service for viewing and managing application logs. This module provides functions for: - Querying database logs with filters - Reading file logs - Log statistics and analytics - Log retention and cleanup - Downloading log files """ import logging from datetime import UTC, datetime, timedelta from pathlib import Path from sqlalchemy import and_, func, or_ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.orm import Session from app.core.config import settings from app.exceptions import ResourceNotFoundException from app.modules.tenancy.exceptions import AdminOperationException from app.modules.tenancy.schemas.admin import ( ApplicationLogFilters, ApplicationLogListResponse, ApplicationLogResponse, FileLogResponse, LogStatistics, ) logger = logging.getLogger(__name__) def _get_application_log_model(): """Deferred import for ApplicationLog model (lives in tenancy, consumed by monitoring).""" from app.modules.tenancy.models import ApplicationLog return ApplicationLog class LogService: """Service for managing application logs.""" def get_database_logs( self, db: Session, filters: ApplicationLogFilters ) -> ApplicationLogListResponse: """ Get logs from database with filtering and pagination. Args: db: Database session filters: Filter criteria Returns: Paginated list of logs """ ApplicationLog = _get_application_log_model() try: query = db.query(ApplicationLog) # Apply filters conditions = [] if filters.level: conditions.append(ApplicationLog.level == filters.level.upper()) if filters.logger_name: conditions.append( ApplicationLog.logger_name.like(f"%{filters.logger_name}%") ) if filters.module: conditions.append(ApplicationLog.module.like(f"%{filters.module}%")) if filters.user_id: conditions.append(ApplicationLog.user_id == filters.user_id) if filters.store_id: conditions.append(ApplicationLog.store_id == filters.store_id) if filters.date_from: conditions.append(ApplicationLog.timestamp >= filters.date_from) if filters.date_to: conditions.append(ApplicationLog.timestamp <= filters.date_to) if filters.search: search_pattern = f"%{filters.search}%" conditions.append( or_( ApplicationLog.message.like(search_pattern), ApplicationLog.exception_message.like(search_pattern), ) ) if conditions: query = query.filter(and_(*conditions)) # Get total count total = query.count() # Apply pagination and sorting logs = ( query.order_by(ApplicationLog.timestamp.desc()) .offset(filters.skip) .limit(filters.limit) .all() ) return ApplicationLogListResponse( logs=[ApplicationLogResponse.model_validate(log) for log in logs], total=total, skip=filters.skip, limit=filters.limit, ) except SQLAlchemyError as e: logger.error(f"Failed to get database logs: {e}") raise AdminOperationException( operation="get_database_logs", reason=f"Database query failed: {str(e)}" ) def get_log_statistics(self, db: Session, days: int = 7) -> LogStatistics: """ Get statistics about logs from the last N days. Args: db: Database session days: Number of days to analyze Returns: Log statistics """ ApplicationLog = _get_application_log_model() try: cutoff_date = datetime.now(UTC) - timedelta(days=days) # Total counts total_count = ( db.query(func.count(ApplicationLog.id)) .filter(ApplicationLog.timestamp >= cutoff_date) .scalar() ) warning_count = ( db.query(func.count(ApplicationLog.id)) .filter( and_( ApplicationLog.timestamp >= cutoff_date, ApplicationLog.level == "WARNING", ) ) .scalar() ) error_count = ( db.query(func.count(ApplicationLog.id)) .filter( and_( ApplicationLog.timestamp >= cutoff_date, ApplicationLog.level == "ERROR", ) ) .scalar() ) critical_count = ( db.query(func.count(ApplicationLog.id)) .filter( and_( ApplicationLog.timestamp >= cutoff_date, ApplicationLog.level == "CRITICAL", ) ) .scalar() ) # Count by level by_level_raw = ( db.query(ApplicationLog.level, func.count(ApplicationLog.id)) .filter(ApplicationLog.timestamp >= cutoff_date) .group_by(ApplicationLog.level) .all() ) by_level = dict(by_level_raw) # Count by module (top 10) by_module_raw = ( db.query(ApplicationLog.module, func.count(ApplicationLog.id)) .filter(ApplicationLog.timestamp >= cutoff_date) .filter(ApplicationLog.module.isnot(None)) .group_by(ApplicationLog.module) .order_by(func.count(ApplicationLog.id).desc()) .limit(10) .all() ) by_module = dict(by_module_raw) # Recent errors (last 5) recent_errors = ( db.query(ApplicationLog) .filter( and_( ApplicationLog.timestamp >= cutoff_date, ApplicationLog.level.in_(["ERROR", "CRITICAL"]), ) ) .order_by(ApplicationLog.timestamp.desc()) .limit(5) .all() ) return LogStatistics( total_count=total_count or 0, warning_count=warning_count or 0, error_count=error_count or 0, critical_count=critical_count or 0, by_level=by_level, by_module=by_module, recent_errors=[ ApplicationLogResponse.model_validate(log) for log in recent_errors ], ) except SQLAlchemyError as e: logger.error(f"Failed to get log statistics: {e}") raise AdminOperationException( operation="get_log_statistics", reason=f"Database query failed: {str(e)}", ) def get_file_logs( self, filename: str = "app.log", lines: int = 500 ) -> FileLogResponse: """ Read logs from file. Args: filename: Log filename (default: app.log) lines: Number of lines to return from end of file Returns: File log content """ try: # Determine log file path log_file_path = settings.log_file if log_file_path: log_file = Path(log_file_path) else: log_file = Path("logs") / "app.log" # Allow reading backup files if filename != "app.log": log_file = log_file.parent / filename if not log_file.exists(): raise ResourceNotFoundException( resource_type="log_file", identifier=str(log_file) ) # Get file stats stat = log_file.stat() # Read last N lines efficiently with open(log_file, encoding="utf-8", errors="replace") as f: # For large files, seek to end and read backwards all_lines = f.readlines() log_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines return FileLogResponse( filename=log_file.name, size_bytes=stat.st_size, last_modified=datetime.fromtimestamp(stat.st_mtime, tz=UTC), lines=[line.rstrip("\n") for line in log_lines], total_lines=len(all_lines), ) except ResourceNotFoundException: raise except OSError as e: logger.error(f"Failed to read log file: {e}") raise AdminOperationException( operation="get_file_logs", reason=f"File read failed: {str(e)}" ) def list_log_files(self) -> list[dict]: """ List all available log files. Returns: List of log file info (name, size, modified date) """ try: # Determine log directory log_file_path = settings.log_file log_dir = Path(log_file_path).parent if log_file_path else Path("logs") if not log_dir.exists(): return [] files = [] for log_file in log_dir.glob("*.log*"): if log_file.is_file(): stat = log_file.stat() files.append( { "filename": log_file.name, "size_bytes": stat.st_size, "size_mb": round(stat.st_size / (1024 * 1024), 2), "last_modified": datetime.fromtimestamp( stat.st_mtime, tz=UTC ).isoformat(), } ) # Sort by modified date (newest first) files.sort(key=lambda x: x["last_modified"], reverse=True) return files except OSError as e: logger.error(f"Failed to list log files: {e}") raise AdminOperationException( operation="list_log_files", reason=f"Directory read failed: {str(e)}" ) def cleanup_old_logs(self, db: Session, retention_days: int) -> int: """ Delete logs older than retention period from database. Args: db: Database session retention_days: Days to retain logs Returns: Number of logs deleted """ ApplicationLog = _get_application_log_model() try: cutoff_date = datetime.now(UTC) - timedelta(days=retention_days) deleted_count = ( db.query(ApplicationLog) .filter(ApplicationLog.timestamp < cutoff_date) .delete() ) db.commit() logger.info( f"Cleaned up {deleted_count} logs older than {retention_days} days" ) return deleted_count except SQLAlchemyError as e: db.rollback() logger.error(f"Failed to cleanup old logs: {e}") raise AdminOperationException( operation="cleanup_old_logs", reason=f"Delete operation failed: {str(e)}", ) def delete_log(self, db: Session, log_id: int) -> str: """Delete a specific log entry.""" ApplicationLog = _get_application_log_model() try: log_entry = ( db.query(ApplicationLog).filter(ApplicationLog.id == log_id).first() ) if not log_entry: raise ResourceNotFoundException( resource_type="log", identifier=str(log_id) ) db.delete(log_entry) db.commit() return f"Log entry {log_id} deleted successfully" except ResourceNotFoundException: raise except SQLAlchemyError as e: db.rollback() logger.error(f"Failed to delete log {log_id}: {e}") raise AdminOperationException( operation="delete_log", reason=f"Delete operation failed: {str(e)}" ) # Create service instance log_service = LogService()