Files
orion/app/services/code_quality_service.py
Samir Boulahtit c7c2c83007 fix: add missing last_scan field in code quality dashboard empty state
Problem:
- GET /api/v1/admin/code-quality/stats returned 500 error
- Pydantic validation error: "last_scan Field required"
- When no scan data exists, service returned dict without last_scan field
- DashboardStatsResponse model required last_scan field

Solution:
1. Added last_scan: None to empty state return dictionary
2. Made last_scan field explicitly optional with default value (= None)
3. Ensures field is always present in response

Changes:
- app/services/code_quality_service.py: Added "last_scan": None
- app/api/v1/admin/code_quality.py: Changed to last_scan: str | None = None

This fixes the 500 error when accessing /admin/code-quality page
with no architecture scan data in the database.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 20:15:07 +01:00

526 lines
16 KiB
Python

"""
Code Quality Service
Business logic for managing architecture scans and violations
"""
import json
import logging
import subprocess
from datetime import datetime
from sqlalchemy import desc, func
from sqlalchemy.orm import Session
from app.models.architecture_scan import (
ArchitectureScan,
ArchitectureViolation,
ViolationAssignment,
ViolationComment,
)
logger = logging.getLogger(__name__)
class CodeQualityService:
"""Service for managing code quality scans and violations"""
def run_scan(self, db: Session, triggered_by: str = "manual") -> ArchitectureScan:
"""
Run architecture validator and store results in database
Args:
db: Database session
triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd')
Returns:
ArchitectureScan object with results
Raises:
Exception: If validator script fails
"""
logger.info(f"Starting architecture scan (triggered by: {triggered_by})")
# Get git commit hash
git_commit = self._get_git_commit_hash()
# Run validator with JSON output
start_time = datetime.now()
try:
result = subprocess.run(
["python", "scripts/validate_architecture.py", "--json"],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
except subprocess.TimeoutExpired:
logger.error("Architecture scan timed out after 5 minutes")
raise Exception("Scan timed out")
duration = (datetime.now() - start_time).total_seconds()
# Parse JSON output (get only the JSON part, skip progress messages)
try:
# Find the JSON part in stdout
lines = result.stdout.strip().split("\n")
json_start = -1
for i, line in enumerate(lines):
if line.strip().startswith("{"):
json_start = i
break
if json_start == -1:
raise ValueError("No JSON output found")
json_output = "\n".join(lines[json_start:])
data = json.loads(json_output)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse validator output: {e}")
logger.error(f"Stdout: {result.stdout}")
logger.error(f"Stderr: {result.stderr}")
raise Exception(f"Failed to parse scan results: {e}")
# Create scan record
scan = ArchitectureScan(
timestamp=datetime.now(),
total_files=data.get("files_checked", 0),
total_violations=data.get("total_violations", 0),
errors=data.get("errors", 0),
warnings=data.get("warnings", 0),
duration_seconds=duration,
triggered_by=triggered_by,
git_commit_hash=git_commit,
)
db.add(scan)
db.flush() # Get scan.id
# Create violation records
violations_data = data.get("violations", [])
logger.info(f"Creating {len(violations_data)} violation records")
for v in violations_data:
violation = ArchitectureViolation(
scan_id=scan.id,
rule_id=v["rule_id"],
rule_name=v["rule_name"],
severity=v["severity"],
file_path=v["file_path"],
line_number=v["line_number"],
message=v["message"],
context=v.get("context", ""),
suggestion=v.get("suggestion", ""),
status="open",
)
db.add(violation)
db.commit()
db.refresh(scan)
logger.info(f"Scan completed: {scan.total_violations} violations found")
return scan
def get_latest_scan(self, db: Session) -> ArchitectureScan | None:
"""Get the most recent scan"""
return (
db.query(ArchitectureScan)
.order_by(desc(ArchitectureScan.timestamp))
.first()
)
def get_scan_by_id(self, db: Session, scan_id: int) -> ArchitectureScan | None:
"""Get scan by ID"""
return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
def get_scan_history(self, db: Session, limit: int = 30) -> list[ArchitectureScan]:
"""
Get scan history for trend graphs
Args:
db: Database session
limit: Maximum number of scans to return
Returns:
List of ArchitectureScan objects, newest first
"""
return (
db.query(ArchitectureScan)
.order_by(desc(ArchitectureScan.timestamp))
.limit(limit)
.all()
)
def get_violations(
self,
db: Session,
scan_id: int = None,
severity: str = None,
status: str = None,
rule_id: str = None,
file_path: str = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[ArchitectureViolation], int]:
"""
Get violations with filtering and pagination
Args:
db: Database session
scan_id: Filter by scan ID (if None, use latest scan)
severity: Filter by severity ('error', 'warning')
status: Filter by status ('open', 'assigned', 'resolved', etc.)
rule_id: Filter by rule ID
file_path: Filter by file path (partial match)
limit: Page size
offset: Page offset
Returns:
Tuple of (violations list, total count)
"""
# If no scan_id specified, use latest scan
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return [], 0
scan_id = latest_scan.id
# Build query
query = db.query(ArchitectureViolation).filter(
ArchitectureViolation.scan_id == scan_id
)
# Apply filters
if severity:
query = query.filter(ArchitectureViolation.severity == severity)
if status:
query = query.filter(ArchitectureViolation.status == status)
if rule_id:
query = query.filter(ArchitectureViolation.rule_id == rule_id)
if file_path:
query = query.filter(ArchitectureViolation.file_path.like(f"%{file_path}%"))
# Get total count
total = query.count()
# Get page of results
violations = (
query.order_by(
ArchitectureViolation.severity.desc(), ArchitectureViolation.file_path
)
.limit(limit)
.offset(offset)
.all()
)
return violations, total
def get_violation_by_id(
self, db: Session, violation_id: int
) -> ArchitectureViolation | None:
"""Get single violation with details"""
return (
db.query(ArchitectureViolation)
.filter(ArchitectureViolation.id == violation_id)
.first()
)
def assign_violation(
self,
db: Session,
violation_id: int,
user_id: int,
assigned_by: int,
due_date: datetime = None,
priority: str = "medium",
) -> ViolationAssignment:
"""
Assign violation to a developer
Args:
db: Database session
violation_id: Violation ID
user_id: User to assign to
assigned_by: User who is assigning
due_date: Due date (optional)
priority: Priority level ('low', 'medium', 'high', 'critical')
Returns:
ViolationAssignment object
"""
# Update violation status
violation = self.get_violation_by_id(db, violation_id)
if violation:
violation.status = "assigned"
violation.assigned_to = user_id
# Create assignment record
assignment = ViolationAssignment(
violation_id=violation_id,
user_id=user_id,
assigned_by=assigned_by,
due_date=due_date,
priority=priority,
)
db.add(assignment)
db.commit()
logger.info(f"Violation {violation_id} assigned to user {user_id}")
return assignment
def resolve_violation(
self, db: Session, violation_id: int, resolved_by: int, resolution_note: str
) -> ArchitectureViolation:
"""
Mark violation as resolved
Args:
db: Database session
violation_id: Violation ID
resolved_by: User who resolved it
resolution_note: Note about resolution
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ValueError(f"Violation {violation_id} not found")
violation.status = "resolved"
violation.resolved_at = datetime.now()
violation.resolved_by = resolved_by
violation.resolution_note = resolution_note
db.commit()
logger.info(f"Violation {violation_id} resolved by user {resolved_by}")
return violation
def ignore_violation(
self, db: Session, violation_id: int, ignored_by: int, reason: str
) -> ArchitectureViolation:
"""
Mark violation as ignored/won't fix
Args:
db: Database session
violation_id: Violation ID
ignored_by: User who ignored it
reason: Reason for ignoring
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ValueError(f"Violation {violation_id} not found")
violation.status = "ignored"
violation.resolved_at = datetime.now()
violation.resolved_by = ignored_by
violation.resolution_note = f"Ignored: {reason}"
db.commit()
logger.info(f"Violation {violation_id} ignored by user {ignored_by}")
return violation
def add_comment(
self, db: Session, violation_id: int, user_id: int, comment: str
) -> ViolationComment:
"""
Add comment to violation
Args:
db: Database session
violation_id: Violation ID
user_id: User posting comment
comment: Comment text
Returns:
ViolationComment object
"""
comment_obj = ViolationComment(
violation_id=violation_id, user_id=user_id, comment=comment
)
db.add(comment_obj)
db.commit()
logger.info(f"Comment added to violation {violation_id} by user {user_id}")
return comment_obj
def get_dashboard_stats(self, db: Session) -> dict:
"""
Get statistics for dashboard
Returns:
Dictionary with various statistics
"""
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return {
"total_violations": 0,
"errors": 0,
"warnings": 0,
"open": 0,
"assigned": 0,
"resolved": 0,
"ignored": 0,
"technical_debt_score": 100,
"trend": [],
"by_severity": {},
"by_rule": {},
"by_module": {},
"top_files": [],
"last_scan": None,
}
# Get violation counts by status
status_counts = (
db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.status)
.all()
)
status_dict = {status: count for status, count in status_counts}
# Get violations by severity
severity_counts = (
db.query(
ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.severity)
.all()
)
by_severity = {sev: count for sev, count in severity_counts}
# Get violations by rule
rule_counts = (
db.query(
ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.rule_id)
.all()
)
by_rule = {
rule: count
for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[
:10
]
}
# Get top violating files
file_counts = (
db.query(
ArchitectureViolation.file_path,
func.count(ArchitectureViolation.id).label("count"),
)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.group_by(ArchitectureViolation.file_path)
.order_by(desc("count"))
.limit(10)
.all()
)
top_files = [{"file": file, "count": count} for file, count in file_counts]
# Get violations by module (extract module from file path)
by_module = {}
violations = (
db.query(ArchitectureViolation.file_path)
.filter(ArchitectureViolation.scan_id == latest_scan.id)
.all()
)
for v in violations:
path_parts = v.file_path.split("/")
if len(path_parts) >= 2:
module = "/".join(path_parts[:2]) # e.g., 'app/api'
else:
module = path_parts[0]
by_module[module] = by_module.get(module, 0) + 1
# Sort by count and take top 10
by_module = dict(
sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10]
)
# Calculate technical debt score
tech_debt_score = self.calculate_technical_debt_score(db, latest_scan.id)
# Get trend (last 7 scans)
trend_scans = self.get_scan_history(db, limit=7)
trend = [
{
"timestamp": scan.timestamp.isoformat(),
"violations": scan.total_violations,
"errors": scan.errors,
"warnings": scan.warnings,
}
for scan in reversed(trend_scans) # Oldest first for chart
]
return {
"total_violations": latest_scan.total_violations,
"errors": latest_scan.errors,
"warnings": latest_scan.warnings,
"open": status_dict.get("open", 0),
"assigned": status_dict.get("assigned", 0),
"resolved": status_dict.get("resolved", 0),
"ignored": status_dict.get("ignored", 0),
"technical_debt_score": tech_debt_score,
"trend": trend,
"by_severity": by_severity,
"by_rule": by_rule,
"by_module": by_module,
"top_files": top_files,
"last_scan": latest_scan.timestamp.isoformat() if latest_scan else None,
}
def calculate_technical_debt_score(self, db: Session, scan_id: int = None) -> int:
"""
Calculate technical debt score (0-100)
Formula: 100 - (errors * 0.5 + warnings * 0.05)
Capped at 0 minimum
Args:
db: Database session
scan_id: Scan ID (if None, use latest)
Returns:
Score from 0-100
"""
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return 100
scan_id = latest_scan.id
scan = self.get_scan_by_id(db, scan_id)
if not scan:
return 100
score = 100 - (scan.errors * 0.5 + scan.warnings * 0.05)
return max(0, min(100, int(score))) # Clamp to 0-100
def _get_git_commit_hash(self) -> str | None:
"""Get current git commit hash"""
try:
result = subprocess.run(
["git", "rev-parse", "HEAD"], capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
return result.stdout.strip()[:40]
except Exception:
pass
return None
# Singleton instance
code_quality_service = CodeQualityService()