Files
orion/app/modules/monitoring/services/log_service.py
Samir Boulahtit d7a0ff8818 refactor: complete module-driven architecture migration
This commit completes the migration to a fully module-driven architecture:

## Models Migration
- Moved all domain models from models/database/ to their respective modules:
  - tenancy: User, Admin, Vendor, Company, Platform, VendorDomain, etc.
  - cms: MediaFile, VendorTheme
  - messaging: Email, VendorEmailSettings, VendorEmailTemplate
  - core: AdminMenuConfig
- models/database/ now only contains Base and TimestampMixin (infrastructure)

## Schemas Migration
- Moved all domain schemas from models/schema/ to their respective modules:
  - tenancy: company, vendor, admin, team, vendor_domain
  - cms: media, image, vendor_theme
  - messaging: email
- models/schema/ now only contains base.py and auth.py (infrastructure)

## Routes Migration
- Moved admin routes from app/api/v1/admin/ to modules:
  - menu_config.py -> core module
  - modules.py -> tenancy module
  - module_config.py -> tenancy module
- app/api/v1/admin/ now only aggregates auto-discovered module routes

## Menu System
- Implemented module-driven menu system with MenuDiscoveryService
- Extended FrontendType enum: PLATFORM, ADMIN, VENDOR, STOREFRONT
- Added MenuItemDefinition and MenuSectionDefinition dataclasses
- Each module now defines its own menu items in definition.py
- MenuService integrates with MenuDiscoveryService for template rendering

## Documentation
- Updated docs/architecture/models-structure.md
- Updated docs/architecture/menu-management.md
- Updated architecture validation rules for new exceptions

## Architecture Validation
- Updated MOD-019 rule to allow base.py in models/schema/
- Created core module exceptions.py and schemas/ directory
- All validation errors resolved (only warnings remain)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:02:56 +01:00

388 lines
12 KiB
Python

# app/modules/monitoring/services/log_service.py
"""
Log management service for viewing and managing application logs.
This module provides functions for:
- Querying database logs with filters
- Reading file logs
- Log statistics and analytics
- Log retention and cleanup
- Downloading log files
"""
import logging
from datetime import UTC, datetime, timedelta
from pathlib import Path
from sqlalchemy import and_, func, or_
from sqlalchemy.orm import Session
from app.core.config import settings
from app.exceptions import ResourceNotFoundException
from app.modules.tenancy.exceptions import AdminOperationException
from app.modules.tenancy.models import ApplicationLog
from app.modules.tenancy.schemas.admin import (
ApplicationLogFilters,
ApplicationLogListResponse,
ApplicationLogResponse,
FileLogResponse,
LogStatistics,
)
logger = logging.getLogger(__name__)
class LogService:
"""Service for managing application logs."""
def get_database_logs(
self, db: Session, filters: ApplicationLogFilters
) -> ApplicationLogListResponse:
"""
Get logs from database with filtering and pagination.
Args:
db: Database session
filters: Filter criteria
Returns:
Paginated list of logs
"""
try:
query = db.query(ApplicationLog)
# Apply filters
conditions = []
if filters.level:
conditions.append(ApplicationLog.level == filters.level.upper())
if filters.logger_name:
conditions.append(
ApplicationLog.logger_name.like(f"%{filters.logger_name}%")
)
if filters.module:
conditions.append(ApplicationLog.module.like(f"%{filters.module}%"))
if filters.user_id:
conditions.append(ApplicationLog.user_id == filters.user_id)
if filters.vendor_id:
conditions.append(ApplicationLog.vendor_id == filters.vendor_id)
if filters.date_from:
conditions.append(ApplicationLog.timestamp >= filters.date_from)
if filters.date_to:
conditions.append(ApplicationLog.timestamp <= filters.date_to)
if filters.search:
search_pattern = f"%{filters.search}%"
conditions.append(
or_(
ApplicationLog.message.like(search_pattern),
ApplicationLog.exception_message.like(search_pattern),
)
)
if conditions:
query = query.filter(and_(*conditions))
# Get total count
total = query.count()
# Apply pagination and sorting
logs = (
query.order_by(ApplicationLog.timestamp.desc())
.offset(filters.skip)
.limit(filters.limit)
.all()
)
return ApplicationLogListResponse(
logs=[ApplicationLogResponse.model_validate(log) for log in logs],
total=total,
skip=filters.skip,
limit=filters.limit,
)
except Exception as e:
logger.error(f"Failed to get database logs: {e}")
raise AdminOperationException(
operation="get_database_logs", reason=f"Database query failed: {str(e)}"
)
def get_log_statistics(self, db: Session, days: int = 7) -> LogStatistics:
"""
Get statistics about logs from the last N days.
Args:
db: Database session
days: Number of days to analyze
Returns:
Log statistics
"""
try:
cutoff_date = datetime.now(UTC) - timedelta(days=days)
# Total counts
total_count = (
db.query(func.count(ApplicationLog.id))
.filter(ApplicationLog.timestamp >= cutoff_date)
.scalar()
)
warning_count = (
db.query(func.count(ApplicationLog.id))
.filter(
and_(
ApplicationLog.timestamp >= cutoff_date,
ApplicationLog.level == "WARNING",
)
)
.scalar()
)
error_count = (
db.query(func.count(ApplicationLog.id))
.filter(
and_(
ApplicationLog.timestamp >= cutoff_date,
ApplicationLog.level == "ERROR",
)
)
.scalar()
)
critical_count = (
db.query(func.count(ApplicationLog.id))
.filter(
and_(
ApplicationLog.timestamp >= cutoff_date,
ApplicationLog.level == "CRITICAL",
)
)
.scalar()
)
# Count by level
by_level_raw = (
db.query(ApplicationLog.level, func.count(ApplicationLog.id))
.filter(ApplicationLog.timestamp >= cutoff_date)
.group_by(ApplicationLog.level)
.all()
)
by_level = {level: count for level, count in by_level_raw}
# Count by module (top 10)
by_module_raw = (
db.query(ApplicationLog.module, func.count(ApplicationLog.id))
.filter(ApplicationLog.timestamp >= cutoff_date)
.filter(ApplicationLog.module.isnot(None))
.group_by(ApplicationLog.module)
.order_by(func.count(ApplicationLog.id).desc())
.limit(10)
.all()
)
by_module = {module: count for module, count in by_module_raw}
# Recent errors (last 5)
recent_errors = (
db.query(ApplicationLog)
.filter(
and_(
ApplicationLog.timestamp >= cutoff_date,
ApplicationLog.level.in_(["ERROR", "CRITICAL"]),
)
)
.order_by(ApplicationLog.timestamp.desc())
.limit(5)
.all()
)
return LogStatistics(
total_count=total_count or 0,
warning_count=warning_count or 0,
error_count=error_count or 0,
critical_count=critical_count or 0,
by_level=by_level,
by_module=by_module,
recent_errors=[
ApplicationLogResponse.model_validate(log) for log in recent_errors
],
)
except Exception as e:
logger.error(f"Failed to get log statistics: {e}")
raise AdminOperationException(
operation="get_log_statistics",
reason=f"Database query failed: {str(e)}",
)
def get_file_logs(
self, filename: str = "app.log", lines: int = 500
) -> FileLogResponse:
"""
Read logs from file.
Args:
filename: Log filename (default: app.log)
lines: Number of lines to return from end of file
Returns:
File log content
"""
try:
# Determine log file path
log_file_path = settings.log_file
if log_file_path:
log_file = Path(log_file_path)
else:
log_file = Path("logs") / "app.log"
# Allow reading backup files
if filename != "app.log":
log_file = log_file.parent / filename
if not log_file.exists():
raise ResourceNotFoundException(
resource_type="log_file", identifier=str(log_file)
)
# Get file stats
stat = log_file.stat()
# Read last N lines efficiently
with open(log_file, encoding="utf-8", errors="replace") as f:
# For large files, seek to end and read backwards
all_lines = f.readlines()
log_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
return FileLogResponse(
filename=log_file.name,
size_bytes=stat.st_size,
last_modified=datetime.fromtimestamp(stat.st_mtime, tz=UTC),
lines=[line.rstrip("\n") for line in log_lines],
total_lines=len(all_lines),
)
except ResourceNotFoundException:
raise
except Exception as e:
logger.error(f"Failed to read log file: {e}")
raise AdminOperationException(
operation="get_file_logs", reason=f"File read failed: {str(e)}"
)
def list_log_files(self) -> list[dict]:
"""
List all available log files.
Returns:
List of log file info (name, size, modified date)
"""
try:
# Determine log directory
log_file_path = settings.log_file
if log_file_path:
log_dir = Path(log_file_path).parent
else:
log_dir = Path("logs")
if not log_dir.exists():
return []
files = []
for log_file in log_dir.glob("*.log*"):
if log_file.is_file():
stat = log_file.stat()
files.append(
{
"filename": log_file.name,
"size_bytes": stat.st_size,
"size_mb": round(stat.st_size / (1024 * 1024), 2),
"last_modified": datetime.fromtimestamp(
stat.st_mtime, tz=UTC
).isoformat(),
}
)
# Sort by modified date (newest first)
files.sort(key=lambda x: x["last_modified"], reverse=True)
return files
except Exception as e:
logger.error(f"Failed to list log files: {e}")
raise AdminOperationException(
operation="list_log_files", reason=f"Directory read failed: {str(e)}"
)
def cleanup_old_logs(self, db: Session, retention_days: int) -> int:
"""
Delete logs older than retention period from database.
Args:
db: Database session
retention_days: Days to retain logs
Returns:
Number of logs deleted
"""
try:
cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
deleted_count = (
db.query(ApplicationLog)
.filter(ApplicationLog.timestamp < cutoff_date)
.delete()
)
db.commit()
logger.info(
f"Cleaned up {deleted_count} logs older than {retention_days} days"
)
return deleted_count
except Exception as e:
db.rollback()
logger.error(f"Failed to cleanup old logs: {e}")
raise AdminOperationException(
operation="cleanup_old_logs",
reason=f"Delete operation failed: {str(e)}",
)
def delete_log(self, db: Session, log_id: int) -> str:
"""Delete a specific log entry."""
try:
log_entry = (
db.query(ApplicationLog).filter(ApplicationLog.id == log_id).first()
)
if not log_entry:
raise ResourceNotFoundException(
resource_type="log", identifier=str(log_id)
)
db.delete(log_entry)
db.commit()
return f"Log entry {log_id} deleted successfully"
except ResourceNotFoundException:
raise
except Exception as e:
db.rollback()
logger.error(f"Failed to delete log {log_id}: {e}")
raise AdminOperationException(
operation="delete_log", reason=f"Delete operation failed: {str(e)}"
)
# Create service instance
log_service = LogService()