Files
orion/app/services/code_quality_service.py
Samir Boulahtit 9db0da25ec feat: implement code quality dashboard with architecture violation tracking
Implement comprehensive code quality dashboard (Phase 2-4) to track and manage
architecture violations found by the validation script.

Backend Implementation:
- Add JSON output support to validate_architecture.py script
- Create CodeQualityService with scan management, violation tracking, and statistics
- Implement REST API endpoints for code quality management:
  * POST /admin/code-quality/scan - trigger new architecture scan
  * GET /admin/code-quality/scans - list scan history
  * GET /admin/code-quality/violations - list violations with filtering/pagination
  * GET /admin/code-quality/violations/{id} - get violation details
  * POST /admin/code-quality/violations/{id}/assign - assign to developer
  * POST /admin/code-quality/violations/{id}/resolve - mark as resolved
  * POST /admin/code-quality/violations/{id}/ignore - mark as ignored
  * POST /admin/code-quality/violations/{id}/comments - add comments
  * GET /admin/code-quality/stats - dashboard statistics
- Fix architecture_scan model imports to use app.core.database

Frontend Implementation:
- Create code quality dashboard page (code-quality-dashboard.html)
  * Summary cards for total violations, errors, warnings, health score
  * Status breakdown (open, assigned, resolved, ignored)
  * Trend visualization for last 7 scans
  * Top violating files list
  * Violations by rule and module
  * Quick action links
- Create violations list page (code-quality-violations.html)
  * Filterable table by severity, status, rule ID, file path
  * Pagination support
  * Violation detail view links
- Add Alpine.js components (code-quality-dashboard.js, code-quality-violations.js)
  * Dashboard state management and scan triggering
  * Violations list with filtering and pagination
  * API integration with authentication
- Add "Code Quality" navigation link in admin sidebar (Developer Tools section)

Routes:
- GET /admin/code-quality - dashboard page
- GET /admin/code-quality/violations - violations list
- GET /admin/code-quality/violations/{id} - violation details

Features:
- Real-time scan execution from UI
- Technical debt score calculation (0-100 scale)
- Violation workflow: open → assigned → resolved/ignored
- Trend tracking across multiple scans
- File and module-level insights
- Assignment system with priorities and due dates
- Collaborative comments on violations

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 09:40:14 +01:00

514 lines
16 KiB
Python

"""
Code Quality Service
Business logic for managing architecture scans and violations
"""
import subprocess
import json
import logging
from datetime import datetime
from typing import List, Tuple, Optional, Dict
from pathlib import Path
from sqlalchemy.orm import Session
from sqlalchemy import func, desc
from app.models.architecture_scan import (
ArchitectureScan,
ArchitectureViolation,
ArchitectureRule,
ViolationAssignment,
ViolationComment
)
logger = logging.getLogger(__name__)
class CodeQualityService:
"""Service for managing code quality scans and violations"""
def run_scan(self, db: Session, triggered_by: str = 'manual') -> ArchitectureScan:
"""
Run architecture validator and store results in database
Args:
db: Database session
triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd')
Returns:
ArchitectureScan object with results
Raises:
Exception: If validator script fails
"""
logger.info(f"Starting architecture scan (triggered by: {triggered_by})")
# Get git commit hash
git_commit = self._get_git_commit_hash()
# Run validator with JSON output
start_time = datetime.now()
try:
result = subprocess.run(
['python', 'scripts/validate_architecture.py', '--json'],
capture_output=True,
text=True,
timeout=300 # 5 minute timeout
)
except subprocess.TimeoutExpired:
logger.error("Architecture scan timed out after 5 minutes")
raise Exception("Scan timed out")
duration = (datetime.now() - start_time).total_seconds()
# Parse JSON output (get only the JSON part, skip progress messages)
try:
# Find the JSON part in stdout
lines = result.stdout.strip().split('\n')
json_start = -1
for i, line in enumerate(lines):
if line.strip().startswith('{'):
json_start = i
break
if json_start == -1:
raise ValueError("No JSON output found")
json_output = '\n'.join(lines[json_start:])
data = json.loads(json_output)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to parse validator output: {e}")
logger.error(f"Stdout: {result.stdout}")
logger.error(f"Stderr: {result.stderr}")
raise Exception(f"Failed to parse scan results: {e}")
# Create scan record
scan = ArchitectureScan(
timestamp=datetime.now(),
total_files=data.get('files_checked', 0),
total_violations=data.get('total_violations', 0),
errors=data.get('errors', 0),
warnings=data.get('warnings', 0),
duration_seconds=duration,
triggered_by=triggered_by,
git_commit_hash=git_commit
)
db.add(scan)
db.flush() # Get scan.id
# Create violation records
violations_data = data.get('violations', [])
logger.info(f"Creating {len(violations_data)} violation records")
for v in violations_data:
violation = ArchitectureViolation(
scan_id=scan.id,
rule_id=v['rule_id'],
rule_name=v['rule_name'],
severity=v['severity'],
file_path=v['file_path'],
line_number=v['line_number'],
message=v['message'],
context=v.get('context', ''),
suggestion=v.get('suggestion', ''),
status='open'
)
db.add(violation)
db.commit()
db.refresh(scan)
logger.info(f"Scan completed: {scan.total_violations} violations found")
return scan
def get_latest_scan(self, db: Session) -> Optional[ArchitectureScan]:
"""Get the most recent scan"""
return db.query(ArchitectureScan).order_by(desc(ArchitectureScan.timestamp)).first()
def get_scan_by_id(self, db: Session, scan_id: int) -> Optional[ArchitectureScan]:
"""Get scan by ID"""
return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
def get_scan_history(self, db: Session, limit: int = 30) -> List[ArchitectureScan]:
"""
Get scan history for trend graphs
Args:
db: Database session
limit: Maximum number of scans to return
Returns:
List of ArchitectureScan objects, newest first
"""
return db.query(ArchitectureScan)\
.order_by(desc(ArchitectureScan.timestamp))\
.limit(limit)\
.all()
def get_violations(
self,
db: Session,
scan_id: int = None,
severity: str = None,
status: str = None,
rule_id: str = None,
file_path: str = None,
limit: int = 100,
offset: int = 0
) -> Tuple[List[ArchitectureViolation], int]:
"""
Get violations with filtering and pagination
Args:
db: Database session
scan_id: Filter by scan ID (if None, use latest scan)
severity: Filter by severity ('error', 'warning')
status: Filter by status ('open', 'assigned', 'resolved', etc.)
rule_id: Filter by rule ID
file_path: Filter by file path (partial match)
limit: Page size
offset: Page offset
Returns:
Tuple of (violations list, total count)
"""
# If no scan_id specified, use latest scan
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return [], 0
scan_id = latest_scan.id
# Build query
query = db.query(ArchitectureViolation).filter(
ArchitectureViolation.scan_id == scan_id
)
# Apply filters
if severity:
query = query.filter(ArchitectureViolation.severity == severity)
if status:
query = query.filter(ArchitectureViolation.status == status)
if rule_id:
query = query.filter(ArchitectureViolation.rule_id == rule_id)
if file_path:
query = query.filter(ArchitectureViolation.file_path.like(f'%{file_path}%'))
# Get total count
total = query.count()
# Get page of results
violations = query.order_by(
ArchitectureViolation.severity.desc(),
ArchitectureViolation.file_path
).limit(limit).offset(offset).all()
return violations, total
def get_violation_by_id(self, db: Session, violation_id: int) -> Optional[ArchitectureViolation]:
"""Get single violation with details"""
return db.query(ArchitectureViolation).filter(
ArchitectureViolation.id == violation_id
).first()
def assign_violation(
self,
db: Session,
violation_id: int,
user_id: int,
assigned_by: int,
due_date: datetime = None,
priority: str = 'medium'
) -> ViolationAssignment:
"""
Assign violation to a developer
Args:
db: Database session
violation_id: Violation ID
user_id: User to assign to
assigned_by: User who is assigning
due_date: Due date (optional)
priority: Priority level ('low', 'medium', 'high', 'critical')
Returns:
ViolationAssignment object
"""
# Update violation status
violation = self.get_violation_by_id(db, violation_id)
if violation:
violation.status = 'assigned'
violation.assigned_to = user_id
# Create assignment record
assignment = ViolationAssignment(
violation_id=violation_id,
user_id=user_id,
assigned_by=assigned_by,
due_date=due_date,
priority=priority
)
db.add(assignment)
db.commit()
logger.info(f"Violation {violation_id} assigned to user {user_id}")
return assignment
def resolve_violation(
self,
db: Session,
violation_id: int,
resolved_by: int,
resolution_note: str
) -> ArchitectureViolation:
"""
Mark violation as resolved
Args:
db: Database session
violation_id: Violation ID
resolved_by: User who resolved it
resolution_note: Note about resolution
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ValueError(f"Violation {violation_id} not found")
violation.status = 'resolved'
violation.resolved_at = datetime.now()
violation.resolved_by = resolved_by
violation.resolution_note = resolution_note
db.commit()
logger.info(f"Violation {violation_id} resolved by user {resolved_by}")
return violation
def ignore_violation(
self,
db: Session,
violation_id: int,
ignored_by: int,
reason: str
) -> ArchitectureViolation:
"""
Mark violation as ignored/won't fix
Args:
db: Database session
violation_id: Violation ID
ignored_by: User who ignored it
reason: Reason for ignoring
Returns:
Updated ArchitectureViolation object
"""
violation = self.get_violation_by_id(db, violation_id)
if not violation:
raise ValueError(f"Violation {violation_id} not found")
violation.status = 'ignored'
violation.resolved_at = datetime.now()
violation.resolved_by = ignored_by
violation.resolution_note = f"Ignored: {reason}"
db.commit()
logger.info(f"Violation {violation_id} ignored by user {ignored_by}")
return violation
def add_comment(
self,
db: Session,
violation_id: int,
user_id: int,
comment: str
) -> ViolationComment:
"""
Add comment to violation
Args:
db: Database session
violation_id: Violation ID
user_id: User posting comment
comment: Comment text
Returns:
ViolationComment object
"""
comment_obj = ViolationComment(
violation_id=violation_id,
user_id=user_id,
comment=comment
)
db.add(comment_obj)
db.commit()
logger.info(f"Comment added to violation {violation_id} by user {user_id}")
return comment_obj
def get_dashboard_stats(self, db: Session) -> Dict:
"""
Get statistics for dashboard
Returns:
Dictionary with various statistics
"""
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return {
'total_violations': 0,
'errors': 0,
'warnings': 0,
'open': 0,
'assigned': 0,
'resolved': 0,
'ignored': 0,
'technical_debt_score': 100,
'trend': [],
'by_severity': {},
'by_rule': {},
'by_module': {},
'top_files': []
}
# Get violation counts by status
status_counts = db.query(
ArchitectureViolation.status,
func.count(ArchitectureViolation.id)
).filter(
ArchitectureViolation.scan_id == latest_scan.id
).group_by(ArchitectureViolation.status).all()
status_dict = {status: count for status, count in status_counts}
# Get violations by severity
severity_counts = db.query(
ArchitectureViolation.severity,
func.count(ArchitectureViolation.id)
).filter(
ArchitectureViolation.scan_id == latest_scan.id
).group_by(ArchitectureViolation.severity).all()
by_severity = {sev: count for sev, count in severity_counts}
# Get violations by rule
rule_counts = db.query(
ArchitectureViolation.rule_id,
func.count(ArchitectureViolation.id)
).filter(
ArchitectureViolation.scan_id == latest_scan.id
).group_by(ArchitectureViolation.rule_id).all()
by_rule = {rule: count for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[:10]}
# Get top violating files
file_counts = db.query(
ArchitectureViolation.file_path,
func.count(ArchitectureViolation.id).label('count')
).filter(
ArchitectureViolation.scan_id == latest_scan.id
).group_by(ArchitectureViolation.file_path)\
.order_by(desc('count'))\
.limit(10).all()
top_files = [{'file': file, 'count': count} for file, count in file_counts]
# Get violations by module (extract module from file path)
by_module = {}
violations = db.query(ArchitectureViolation.file_path).filter(
ArchitectureViolation.scan_id == latest_scan.id
).all()
for v in violations:
path_parts = v.file_path.split('/')
if len(path_parts) >= 2:
module = '/'.join(path_parts[:2]) # e.g., 'app/api'
else:
module = path_parts[0]
by_module[module] = by_module.get(module, 0) + 1
# Sort by count and take top 10
by_module = dict(sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10])
# Calculate technical debt score
tech_debt_score = self.calculate_technical_debt_score(db, latest_scan.id)
# Get trend (last 7 scans)
trend_scans = self.get_scan_history(db, limit=7)
trend = [
{
'timestamp': scan.timestamp.isoformat(),
'violations': scan.total_violations,
'errors': scan.errors,
'warnings': scan.warnings
}
for scan in reversed(trend_scans) # Oldest first for chart
]
return {
'total_violations': latest_scan.total_violations,
'errors': latest_scan.errors,
'warnings': latest_scan.warnings,
'open': status_dict.get('open', 0),
'assigned': status_dict.get('assigned', 0),
'resolved': status_dict.get('resolved', 0),
'ignored': status_dict.get('ignored', 0),
'technical_debt_score': tech_debt_score,
'trend': trend,
'by_severity': by_severity,
'by_rule': by_rule,
'by_module': by_module,
'top_files': top_files,
'last_scan': latest_scan.timestamp.isoformat() if latest_scan else None
}
def calculate_technical_debt_score(self, db: Session, scan_id: int = None) -> int:
"""
Calculate technical debt score (0-100)
Formula: 100 - (errors * 0.5 + warnings * 0.05)
Capped at 0 minimum
Args:
db: Database session
scan_id: Scan ID (if None, use latest)
Returns:
Score from 0-100
"""
if scan_id is None:
latest_scan = self.get_latest_scan(db)
if not latest_scan:
return 100
scan_id = latest_scan.id
scan = self.get_scan_by_id(db, scan_id)
if not scan:
return 100
score = 100 - (scan.errors * 0.5 + scan.warnings * 0.05)
return max(0, min(100, int(score))) # Clamp to 0-100
def _get_git_commit_hash(self) -> Optional[str]:
"""Get current git commit hash"""
try:
result = subprocess.run(
['git', 'rev-parse', 'HEAD'],
capture_output=True,
text=True,
timeout=5
)
if result.returncode == 0:
return result.stdout.strip()[:40]
except Exception:
pass
return None
# Singleton instance
code_quality_service = CodeQualityService()