Files
orion/app/services/code_quality_service.py
Samir Boulahtit 3520bcb069 refactor: move transaction management from services to API endpoints
- Services now use db.flush() instead of db.commit() for database operations
- API endpoints handle transaction commit after service calls
- Remove db.rollback() from services (let exception handlers manage this)
- Ensures consistent transaction boundaries at API layer

This pattern gives API endpoints full control over when to commit,
allowing for better error handling and potential multi-operation transactions.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-06 18:34:41 +01:00

531 lines
16 KiB
Python

"""
Code Quality Service
Business logic for managing architecture scans and violations
"""
import json
import logging
import subprocess
from datetime import datetime
from sqlalchemy import desc, func
from sqlalchemy.orm import Session
from app.exceptions import (
ScanParseException,
ScanTimeoutException,
ViolationNotFoundException,
)
from models.database.architecture_scan import (
ArchitectureScan,
ArchitectureViolation,
ViolationAssignment,
ViolationComment,
)
logger = logging.getLogger(__name__)
class CodeQualityService:
"""Service for managing code quality scans and violations"""
def run_scan(self, db: Session, triggered_by: str = "manual") -> ArchitectureScan:
"""
Run architecture validator and store results in database
Args:
db: Database session
triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd')
Returns:
ArchitectureScan object with results
Raises:
Exception: If validator script fails
"""
logger.info(f"Starting architecture scan (triggered by: {triggered_by})")
# Get git commit hash
git_commit = self._get_git_commit_hash()
# Run validator with JSON output
start_time = datetime.now()
try:
result = subprocess.run(
["python", "scripts/validate_architecture.py", "--json"],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
except subprocess.TimeoutExpired:
logger.error("Architecture scan timed out after 5 minutes")
raise ScanTimeoutException(timeout_seconds=300)
duration = (datetime.now() - start_time).total_seconds()
# Parse JSON output (get only the JSON part, skip progress messages)
try:
# Find the JSON part in stdout
lines = result.stdout.strip().split("\n")
json_start = -1
for i, line in enumerate(lines):
if line.strip().startswith("{"):
json_start = i
break
if json_start == -1:
raise ValueError("No JSON output found")
json_output = "\n".join(lines[json_start:])
data = json.loads(json_output)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse validator output: {e}")
logger.error(f"Stdout: {result.stdout}")
logger.error(f"Stderr: {result.stderr}")
raise ScanParseException(reason=str(e))
# Create scan record
scan = ArchitectureScan(
timestamp=datetime.now(),
total_files=data.get("files_checked", 0),
total_violations=data.get("total_violations", 0),
errors=data.get("errors", 0),
warnings=data.get("warnings", 0),
duration_seconds=duration,
triggered_by=triggered_by,
git_commit_hash=git_commit,
)
db.add(scan)
db.flush() # Get scan.id
# Create violation records
violations_data = data.get("violations", [])
logger.info(f"Creating {len(violations_data)} violation records")
for v in violations_data:
violation = ArchitectureViolation(
scan_id=scan.id,
rule_id=v["rule_id"],
rule_name=v["rule_name"],
severity=v["severity"],
file_path=v["file_path"],
line_number=v["line_number"],
message=v["message"],
context=v.get("context", ""),
suggestion=v.get("suggestion", ""),
status="open",
)
db.add(violation)
db.flush()
db.refresh(scan)
logger.info(f"Scan completed: {scan.total_violations} violations found")
return scan
def get_latest_scan(self, db: Session) -> ArchitectureScan | None:
"""Get the most recent scan"""
return (
db.query(ArchitectureScan)
.order_by(desc(ArchitectureScan.timestamp))
.first()
)
def get_scan_by_id(self, db: Session, scan_id: int) -> ArchitectureScan | None:
"""Get scan by ID"""
return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
def get_scan_history(self, db: Session, limit: int = 30) -> list[ArchitectureScan]:
"""
Get scan history for trend graphs
Args:
db: Database session
limit: Maximum number of scans to return
Returns:
List of ArchitectureScan objects, newest first
"""
return (
db.query(ArchitectureScan)
.order_by(desc(ArchitectureScan.timestamp))
.limit(limit)
.all()
)
def get_violations(
self,
db: Session,
scan_id: int = None,
severity: str = None,
status: str = None,
rule_id: str = None,
file_path: str = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[ArchitectureViolation], int]:
"""
Get violations with filtering and pagination
Args:
db: Database session
scan_id: Filter by scan ID (if None, use latest scan)
severity: Filter by severity ('error', 'warning')
status: Filter by status ('open', 'assigned', 'resolved', etc.)
rule_id: Filter by rule ID
file_path: Filter by file path (partial match)
limit: Page size
offset: Page offset
Returns:
Tuple of (violations list, total count)
"""
# If no scan_id specified, use latest scan
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return [], 0
scan_id = latest_scan.id
# Build query
query = db.query(ArchitectureViolation).filter(
ArchitectureViolation.scan_id == scan_id
)
# Apply filters
if severity:
query = query.filter(ArchitectureViolation.severity == severity)
if status:
query = query.filter(ArchitectureViolation.status == status)
if rule_id:
query = query.filter(ArchitectureViolation.rule_id == rule_id)
if file_path:
query = query.filter(ArchitectureViolation.file_path.like(f"%{file_path}%"))
# Get total count
total = query.count()
# Get page of results
violations = (
query.order_by(
ArchitectureViolation.severity.desc(), ArchitectureViolation.file_path
)
.limit(limit)
.offset(offset)
.all()
)
return violations, total
def get_violation_by_id(
self, db: Session, violation_id: int
) -> ArchitectureViolation | None:
"""Get single violation with details"""
return (
db.query(ArchitectureViolation)
.filter(ArchitectureViolation.id == violation_id)
.first()
)
def assign_violation(
self,
db: Session,
violation_id: int,
user_id: int,
assigned_by: int,
due_date: datetime = None,
priority: str = "medium",
) -> ViolationAssignment:
"""
Assign violation to a developer
Args:
db: Database session
violation_id: Violation ID
user_id: User to assign to
assigned_by: User who is assigning
due_date: Due date (optional)
priority: Priority level ('low', 'medium', 'high', 'critical')
Returns:
ViolationAssignment object
"""
# Update violation status
violation = self.get_violation_by_id(db, violation_id)
if violation:
violation.status = "assigned"
violation.assigned_to = user_id
# Create assignment record
assignment = ViolationAssignment(
violation_id=violation_id,
user_id=user_id,
assigned_by=assigned_by,
due_date=due_date,
priority=priority,
)
db.add(assignment)
db.flush()
logger.info(f"Violation {violation_id} assigned to user {user_id}")
return assignment
def resolve_violation(
self, db: Session, violation_id: int, resolved_by: int, resolution_note: str
) -> ArchitectureViolation:
"""
Mark violation as resolved
Args:
db: Database session
violation_id: Violation ID
resolved_by: User who resolved it
resolution_note: Note about resolution
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ViolationNotFoundException(violation_id)
violation.status = "resolved"
violation.resolved_at = datetime.now()
violation.resolved_by = resolved_by
violation.resolution_note = resolution_note
db.flush()
logger.info(f"Violation {violation_id} resolved by user {resolved_by}")
return violation
def ignore_violation(
self, db: Session, violation_id: int, ignored_by: int, reason: str
) -> ArchitectureViolation:
"""
Mark violation as ignored/won't fix
Args:
db: Database session
violation_id: Violation ID
ignored_by: User who ignored it
reason: Reason for ignoring
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ViolationNotFoundException(violation_id)
violation.status = "ignored"
violation.resolved_at = datetime.now()
violation.resolved_by = ignored_by
violation.resolution_note = f"Ignored: {reason}"
db.flush()
logger.info(f"Violation {violation_id} ignored by user {ignored_by}")
return violation
def add_comment(
self, db: Session, violation_id: int, user_id: int, comment: str
) -> ViolationComment:
"""
Add comment to violation
Args:
db: Database session
violation_id: Violation ID
user_id: User posting comment
comment: Comment text
Returns:
ViolationComment object
"""
comment_obj = ViolationComment(
violation_id=violation_id, user_id=user_id, comment=comment
)
db.add(comment_obj)
db.flush()
logger.info(f"Comment added to violation {violation_id} by user {user_id}")
return comment_obj
def get_dashboard_stats(self, db: Session) -> dict:
"""
Get statistics for dashboard
Returns:
Dictionary with various statistics
"""
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return {
"total_violations": 0,
"errors": 0,
"warnings": 0,
"open": 0,
"assigned": 0,
"resolved": 0,
"ignored": 0,
"technical_debt_score": 100,
"trend": [],
"by_severity": {},
"by_rule": {},
"by_module": {},
"top_files": [],
"last_scan": None,
}
# Get violation counts by status
status_counts = (
db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.status)
.all()
)
status_dict = {status: count for status, count in status_counts}
# Get violations by severity
severity_counts = (
db.query(
ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.severity)
.all()
)
by_severity = {sev: count for sev, count in severity_counts}
# Get violations by rule
rule_counts = (
db.query(
ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.rule_id)
.all()
)
by_rule = {
rule: count
for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[
:10
]
}
# Get top violating files
file_counts = (
db.query(
ArchitectureViolation.file_path,
func.count(ArchitectureViolation.id).label("count"),
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.file_path)
.order_by(desc("count"))
.limit(10)
.all()
)
top_files = [{"file": file, "count": count} for file, count in file_counts]
# Get violations by module (extract module from file path)
by_module = {}
violations = (
db.query(ArchitectureViolation.file_path)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.all()
)
for v in violations:
path_parts = v.file_path.split("/")
if len(path_parts) >= 2:
module = "/".join(path_parts[:2]) # e.g., 'app/api'
else:
module = path_parts[0]
by_module[module] = by_module.get(module, 0) + 1
# Sort by count and take top 10
by_module = dict(
sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10]
)
# Calculate technical debt score
tech_debt_score = self.calculate_technical_debt_score(db, latest_scan.id)
# Get trend (last 7 scans)
trend_scans = self.get_scan_history(db, limit=7)
trend = [
{
"timestamp": scan.timestamp.isoformat(),
"violations": scan.total_violations,
"errors": scan.errors,
"warnings": scan.warnings,
}
for scan in reversed(trend_scans) # Oldest first for chart
]
return {
"total_violations": latest_scan.total_violations,
"errors": latest_scan.errors,
"warnings": latest_scan.warnings,
"open": status_dict.get("open", 0),
"assigned": status_dict.get("assigned", 0),
"resolved": status_dict.get("resolved", 0),
"ignored": status_dict.get("ignored", 0),
"technical_debt_score": tech_debt_score,
"trend": trend,
"by_severity": by_severity,
"by_rule": by_rule,
"by_module": by_module,
"top_files": top_files,
"last_scan": latest_scan.timestamp.isoformat() if latest_scan else None,
}
def calculate_technical_debt_score(self, db: Session, scan_id: int = None) -> int:
"""
Calculate technical debt score (0-100)
Formula: 100 - (errors * 0.5 + warnings * 0.05)
Capped at 0 minimum
Args:
db: Database session
scan_id: Scan ID (if None, use latest)
Returns:
Score from 0-100
"""
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return 100
scan_id = latest_scan.id
scan = self.get_scan_by_id(db, scan_id)
if not scan:
return 100
score = 100 - (scan.errors * 0.5 + scan.warnings * 0.05)
return max(0, min(100, int(score))) # Clamp to 0-100
def _get_git_commit_hash(self) -> str | None:
"""Get current git commit hash"""
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return result.stdout.strip()[:40]
except Exception:
pass
return None
# Singleton instance
code_quality_service = CodeQualityService()