Files
orion/app/tasks/code_quality_tasks.py
Samir Boulahtit b9f08b853f refactor: clean up legacy models and migrate remaining schemas
Delete empty stub files from models/database/:
- audit.py, backup.py, configuration.py, monitoring.py
- notification.py, payment.py, search.py, task.py

Delete re-export files:
- models/database/subscription.py → app.modules.billing.models
- models/database/architecture_scan.py → app.modules.dev_tools.models
- models/database/test_run.py → app.modules.dev_tools.models
- models/schema/subscription.py → app.modules.billing.schemas
- models/schema/marketplace.py (empty)
- models/schema/monitoring.py (empty)

Migrate schemas to canonical module locations:
- billing.py → app/modules/billing/schemas/
- vendor_product.py → app/modules/catalog/schemas/
- homepage_sections.py → app/modules/cms/schemas/

Keep as CORE (framework-level, used everywhere):
- models/schema/: admin, auth, base, company, email, image, media, team, vendor*
- models/database/: admin*, base, company, email, feature, media, platform*, user, vendor*

Update 30+ files to use canonical import locations.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-30 18:45:46 +01:00

218 lines
7.5 KiB
Python

# app/tasks/code_quality_tasks.py
"""Background tasks for code quality scans."""
import json
import logging
import subprocess
from datetime import UTC, datetime
from app.core.database import SessionLocal
from app.services.admin_notification_service import admin_notification_service
from app.modules.dev_tools.models import ArchitectureScan, ArchitectureViolation
logger = logging.getLogger(__name__)
# Validator type constants
VALIDATOR_ARCHITECTURE = "architecture"
VALIDATOR_SECURITY = "security"
VALIDATOR_PERFORMANCE = "performance"
VALID_VALIDATOR_TYPES = [VALIDATOR_ARCHITECTURE, VALIDATOR_SECURITY, VALIDATOR_PERFORMANCE]
# Map validator types to their scripts
VALIDATOR_SCRIPTS = {
VALIDATOR_ARCHITECTURE: "scripts/validate_architecture.py",
VALIDATOR_SECURITY: "scripts/validate_security.py",
VALIDATOR_PERFORMANCE: "scripts/validate_performance.py",
}
# Human-readable names
VALIDATOR_NAMES = {
VALIDATOR_ARCHITECTURE: "Architecture",
VALIDATOR_SECURITY: "Security",
VALIDATOR_PERFORMANCE: "Performance",
}
def _get_git_commit_hash() -> str | None:
"""Get current git commit hash"""
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"],
capture_output=True,
text=True,
timeout=5,
)
if result.returncode == 0:
return result.stdout.strip()[:40]
except Exception:
pass
return None
async def execute_code_quality_scan(scan_id: int):
"""
Background task to execute a code quality scan.
This task:
1. Gets the scan record from DB
2. Updates status to 'running'
3. Runs the validator script
4. Parses JSON output and creates violation records
5. Updates scan with results and status 'completed' or 'failed'
Args:
scan_id: ID of the ArchitectureScan record
"""
db = SessionLocal()
scan = None
try:
# Get the scan record
scan = db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
if not scan:
logger.error(f"Code quality scan {scan_id} not found")
return
validator_type = scan.validator_type
if validator_type not in VALID_VALIDATOR_TYPES:
raise ValueError(f"Invalid validator type: {validator_type}")
script_path = VALIDATOR_SCRIPTS[validator_type]
validator_name = VALIDATOR_NAMES[validator_type]
# Update status to running
scan.status = "running"
scan.started_at = datetime.now(UTC)
scan.progress_message = f"Running {validator_name} validator..."
scan.git_commit_hash = _get_git_commit_hash()
db.commit()
logger.info(f"Starting {validator_name} scan (scan_id={scan_id})")
# Run validator with JSON output
start_time = datetime.now(UTC)
try:
result = subprocess.run(
["python", script_path, "--json"],
capture_output=True,
text=True,
timeout=600, # 10 minute timeout
)
except subprocess.TimeoutExpired:
logger.error(f"{validator_name} scan {scan_id} timed out after 10 minutes")
scan.status = "failed"
scan.error_message = "Scan timed out after 10 minutes"
scan.completed_at = datetime.now(UTC)
db.commit()
return
duration = (datetime.now(UTC) - start_time).total_seconds()
# Update progress
scan.progress_message = "Parsing results..."
db.commit()
# Parse JSON output (get only the JSON part, skip progress messages)
try:
lines = result.stdout.strip().split("\n")
json_start = -1
for i, line in enumerate(lines):
if line.strip().startswith("{"):
json_start = i
break
if json_start == -1:
raise ValueError("No JSON output found in validator output")
json_output = "\n".join(lines[json_start:])
data = json.loads(json_output)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse {validator_name} validator output: {e}")
logger.error(f"Stdout: {result.stdout[:1000]}")
logger.error(f"Stderr: {result.stderr[:1000]}")
scan.status = "failed"
scan.error_message = f"Failed to parse validator output: {e}"
scan.completed_at = datetime.now(UTC)
scan.duration_seconds = duration
db.commit()
return
# Update progress
scan.progress_message = "Storing violations..."
db.commit()
# Create violation records
violations_data = data.get("violations", [])
logger.info(f"Creating {len(violations_data)} {validator_name} violation records")
for v in violations_data:
violation = ArchitectureViolation(
scan_id=scan.id,
validator_type=validator_type,
rule_id=v.get("rule_id", "UNKNOWN"),
rule_name=v.get("rule_name", "Unknown Rule"),
severity=v.get("severity", "warning"),
file_path=v.get("file_path", ""),
line_number=v.get("line_number", 0),
message=v.get("message", ""),
context=v.get("context", ""),
suggestion=v.get("suggestion", ""),
status="open",
)
db.add(violation)
# Update scan with results
scan.total_files = data.get("files_checked", 0)
scan.total_violations = data.get("total_violations", len(violations_data))
scan.errors = data.get("errors", 0)
scan.warnings = data.get("warnings", 0)
scan.duration_seconds = duration
scan.completed_at = datetime.now(UTC)
scan.progress_message = None
# Set final status based on results
if scan.errors > 0:
scan.status = "completed_with_warnings"
else:
scan.status = "completed"
db.commit()
logger.info(
f"{validator_name} scan {scan_id} completed: "
f"files={scan.total_files}, violations={scan.total_violations}, "
f"errors={scan.errors}, warnings={scan.warnings}, "
f"duration={duration:.1f}s"
)
except Exception as e:
logger.error(f"Code quality scan {scan_id} failed: {e}", exc_info=True)
if scan is not None:
try:
scan.status = "failed"
scan.error_message = str(e)[:500] # Truncate long errors
scan.completed_at = datetime.now(UTC)
scan.progress_message = None
# Create admin notification for scan failure
admin_notification_service.create_notification(
db=db,
title="Code Quality Scan Failed",
message=f"{VALIDATOR_NAMES.get(scan.validator_type, 'Unknown')} scan failed: {str(e)[:200]}",
notification_type="error",
category="code_quality",
action_url="/admin/code-quality",
)
db.commit()
except Exception as commit_error:
logger.error(f"Failed to update scan status: {commit_error}")
db.rollback()
finally:
if hasattr(db, "close") and callable(db.close):
try:
db.close()
except Exception as close_error:
logger.error(f"Error closing database session: {close_error}")