""" Code Quality Service Business logic for managing architecture scans and violations """ import json import logging import subprocess from datetime import datetime from sqlalchemy import desc, func from sqlalchemy.orm import Session from app.exceptions import ( ScanParseException, ScanTimeoutException, ViolationNotFoundException, ) from models.database.architecture_scan import ( ArchitectureScan, ArchitectureViolation, ViolationAssignment, ViolationComment, ) logger = logging.getLogger(__name__) class CodeQualityService: """Service for managing code quality scans and violations""" def run_scan(self, db: Session, triggered_by: str = "manual") -> ArchitectureScan: """ Run architecture validator and store results in database Args: db: Database session triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd') Returns: ArchitectureScan object with results Raises: Exception: If validator script fails """ logger.info(f"Starting architecture scan (triggered by: {triggered_by})") # Get git commit hash git_commit = self._get_git_commit_hash() # Run validator with JSON output start_time = datetime.now() try: result = subprocess.run( ["python", "scripts/validate_architecture.py", "--json"], capture_output=True, text=True, timeout=300, # 5 minute timeout ) except subprocess.TimeoutExpired: logger.error("Architecture scan timed out after 5 minutes") raise ScanTimeoutException(timeout_seconds=300) duration = (datetime.now() - start_time).total_seconds() # Parse JSON output (get only the JSON part, skip progress messages) try: # Find the JSON part in stdout lines = result.stdout.strip().split("\n") json_start = -1 for i, line in enumerate(lines): if line.strip().startswith("{"): json_start = i break if json_start == -1: raise ValueError("No JSON output found") json_output = "\n".join(lines[json_start:]) data = json.loads(json_output) except (json.JSONDecodeError, ValueError) as e: logger.error(f"Failed to parse validator output: {e}") logger.error(f"Stdout: {result.stdout}") logger.error(f"Stderr: {result.stderr}") raise ScanParseException(reason=str(e)) # Create scan record scan = ArchitectureScan( timestamp=datetime.now(), total_files=data.get("files_checked", 0), total_violations=data.get("total_violations", 0), errors=data.get("errors", 0), warnings=data.get("warnings", 0), duration_seconds=duration, triggered_by=triggered_by, git_commit_hash=git_commit, ) db.add(scan) db.flush() # Get scan.id # Create violation records violations_data = data.get("violations", []) logger.info(f"Creating {len(violations_data)} violation records") for v in violations_data: violation = ArchitectureViolation( scan_id=scan.id, rule_id=v["rule_id"], rule_name=v["rule_name"], severity=v["severity"], file_path=v["file_path"], line_number=v["line_number"], message=v["message"], context=v.get("context", ""), suggestion=v.get("suggestion", ""), status="open", ) db.add(violation) db.commit() db.refresh(scan) logger.info(f"Scan completed: {scan.total_violations} violations found") return scan def get_latest_scan(self, db: Session) -> ArchitectureScan | None: """Get the most recent scan""" return ( db.query(ArchitectureScan) .order_by(desc(ArchitectureScan.timestamp)) .first() ) def get_scan_by_id(self, db: Session, scan_id: int) -> ArchitectureScan | None: """Get scan by ID""" return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first() def get_scan_history(self, db: Session, limit: int = 30) -> list[ArchitectureScan]: """ Get scan history for trend graphs Args: db: Database session limit: Maximum number of scans to return Returns: List of ArchitectureScan objects, newest first """ return ( db.query(ArchitectureScan) .order_by(desc(ArchitectureScan.timestamp)) .limit(limit) .all() ) def get_violations( self, db: Session, scan_id: int = None, severity: str = None, status: str = None, rule_id: str = None, file_path: str = None, limit: int = 100, offset: int = 0, ) -> tuple[list[ArchitectureViolation], int]: """ Get violations with filtering and pagination Args: db: Database session scan_id: Filter by scan ID (if None, use latest scan) severity: Filter by severity ('error', 'warning') status: Filter by status ('open', 'assigned', 'resolved', etc.) rule_id: Filter by rule ID file_path: Filter by file path (partial match) limit: Page size offset: Page offset Returns: Tuple of (violations list, total count) """ # If no scan_id specified, use latest scan if scan_id is None: latest_scan = self.get_latest_scan(db) if not latest_scan: return [], 0 scan_id = latest_scan.id # Build query query = db.query(ArchitectureViolation).filter( ArchitectureViolation.scan_id == scan_id ) # Apply filters if severity: query = query.filter(ArchitectureViolation.severity == severity) if status: query = query.filter(ArchitectureViolation.status == status) if rule_id: query = query.filter(ArchitectureViolation.rule_id == rule_id) if file_path: query = query.filter(ArchitectureViolation.file_path.like(f"%{file_path}%")) # Get total count total = query.count() # Get page of results violations = ( query.order_by( ArchitectureViolation.severity.desc(), ArchitectureViolation.file_path ) .limit(limit) .offset(offset) .all() ) return violations, total def get_violation_by_id( self, db: Session, violation_id: int ) -> ArchitectureViolation | None: """Get single violation with details""" return ( db.query(ArchitectureViolation) .filter(ArchitectureViolation.id == violation_id) .first() ) def assign_violation( self, db: Session, violation_id: int, user_id: int, assigned_by: int, due_date: datetime = None, priority: str = "medium", ) -> ViolationAssignment: """ Assign violation to a developer Args: db: Database session violation_id: Violation ID user_id: User to assign to assigned_by: User who is assigning due_date: Due date (optional) priority: Priority level ('low', 'medium', 'high', 'critical') Returns: ViolationAssignment object """ # Update violation status violation = self.get_violation_by_id(db, violation_id) if violation: violation.status = "assigned" violation.assigned_to = user_id # Create assignment record assignment = ViolationAssignment( violation_id=violation_id, user_id=user_id, assigned_by=assigned_by, due_date=due_date, priority=priority, ) db.add(assignment) db.commit() logger.info(f"Violation {violation_id} assigned to user {user_id}") return assignment def resolve_violation( self, db: Session, violation_id: int, resolved_by: int, resolution_note: str ) -> ArchitectureViolation: """ Mark violation as resolved Args: db: Database session violation_id: Violation ID resolved_by: User who resolved it resolution_note: Note about resolution Returns: Updated ArchitectureViolation object """ violation = self.get_violation_by_id(db, violation_id) if not violation: raise ViolationNotFoundException(violation_id) violation.status = "resolved" violation.resolved_at = datetime.now() violation.resolved_by = resolved_by violation.resolution_note = resolution_note db.commit() logger.info(f"Violation {violation_id} resolved by user {resolved_by}") return violation def ignore_violation( self, db: Session, violation_id: int, ignored_by: int, reason: str ) -> ArchitectureViolation: """ Mark violation as ignored/won't fix Args: db: Database session violation_id: Violation ID ignored_by: User who ignored it reason: Reason for ignoring Returns: Updated ArchitectureViolation object """ violation = self.get_violation_by_id(db, violation_id) if not violation: raise ViolationNotFoundException(violation_id) violation.status = "ignored" violation.resolved_at = datetime.now() violation.resolved_by = ignored_by violation.resolution_note = f"Ignored: {reason}" db.commit() logger.info(f"Violation {violation_id} ignored by user {ignored_by}") return violation def add_comment( self, db: Session, violation_id: int, user_id: int, comment: str ) -> ViolationComment: """ Add comment to violation Args: db: Database session violation_id: Violation ID user_id: User posting comment comment: Comment text Returns: ViolationComment object """ comment_obj = ViolationComment( violation_id=violation_id, user_id=user_id, comment=comment ) db.add(comment_obj) db.commit() logger.info(f"Comment added to violation {violation_id} by user {user_id}") return comment_obj def get_dashboard_stats(self, db: Session) -> dict: """ Get statistics for dashboard Returns: Dictionary with various statistics """ latest_scan = self.get_latest_scan(db) if not latest_scan: return { "total_violations": 0, "errors": 0, "warnings": 0, "open": 0, "assigned": 0, "resolved": 0, "ignored": 0, "technical_debt_score": 100, "trend": [], "by_severity": {}, "by_rule": {}, "by_module": {}, "top_files": [], "last_scan": None, } # Get violation counts by status status_counts = ( db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id)) .filter(ArchitectureViolation.scan_id == latest_scan.id) .group_by(ArchitectureViolation.status) .all() ) status_dict = {status: count for status, count in status_counts} # Get violations by severity severity_counts = ( db.query( ArchitectureViolation.severity, func.count(ArchitectureViolation.id) ) .filter(ArchitectureViolation.scan_id == latest_scan.id) .group_by(ArchitectureViolation.severity) .all() ) by_severity = {sev: count for sev, count in severity_counts} # Get violations by rule rule_counts = ( db.query( ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id) ) .filter(ArchitectureViolation.scan_id == latest_scan.id) .group_by(ArchitectureViolation.rule_id) .all() ) by_rule = { rule: count for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[ :10 ] } # Get top violating files file_counts = ( db.query( ArchitectureViolation.file_path, func.count(ArchitectureViolation.id).label("count"), ) .filter(ArchitectureViolation.scan_id == latest_scan.id) .group_by(ArchitectureViolation.file_path) .order_by(desc("count")) .limit(10) .all() ) top_files = [{"file": file, "count": count} for file, count in file_counts] # Get violations by module (extract module from file path) by_module = {} violations = ( db.query(ArchitectureViolation.file_path) .filter(ArchitectureViolation.scan_id == latest_scan.id) .all() ) for v in violations: path_parts = v.file_path.split("/") if len(path_parts) >= 2: module = "/".join(path_parts[:2]) # e.g., 'app/api' else: module = path_parts[0] by_module[module] = by_module.get(module, 0) + 1 # Sort by count and take top 10 by_module = dict( sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10] ) # Calculate technical debt score tech_debt_score = self.calculate_technical_debt_score(db, latest_scan.id) # Get trend (last 7 scans) trend_scans = self.get_scan_history(db, limit=7) trend = [ { "timestamp": scan.timestamp.isoformat(), "violations": scan.total_violations, "errors": scan.errors, "warnings": scan.warnings, } for scan in reversed(trend_scans) # Oldest first for chart ] return { "total_violations": latest_scan.total_violations, "errors": latest_scan.errors, "warnings": latest_scan.warnings, "open": status_dict.get("open", 0), "assigned": status_dict.get("assigned", 0), "resolved": status_dict.get("resolved", 0), "ignored": status_dict.get("ignored", 0), "technical_debt_score": tech_debt_score, "trend": trend, "by_severity": by_severity, "by_rule": by_rule, "by_module": by_module, "top_files": top_files, "last_scan": latest_scan.timestamp.isoformat() if latest_scan else None, } def calculate_technical_debt_score(self, db: Session, scan_id: int = None) -> int: """ Calculate technical debt score (0-100) Formula: 100 - (errors * 0.5 + warnings * 0.05) Capped at 0 minimum Args: db: Database session scan_id: Scan ID (if None, use latest) Returns: Score from 0-100 """ if scan_id is None: latest_scan = self.get_latest_scan(db) if not latest_scan: return 100 scan_id = latest_scan.id scan = self.get_scan_by_id(db, scan_id) if not scan: return 100 score = 100 - (scan.errors * 0.5 + scan.warnings * 0.05) return max(0, min(100, int(score))) # Clamp to 0-100 def _get_git_commit_hash(self) -> str | None: """Get current git commit hash""" try: result = subprocess.run( ["git", "rev-parse", "HEAD"], capture_output=True, text=True, timeout=5 ) if result.returncode == 0: return result.stdout.strip()[:40] except Exception: pass return None # Singleton instance code_quality_service = CodeQualityService()