refactor: migrate modules from re-exports to canonical implementations
Move actual code implementations into module directories: - orders: 5 services, 4 models, order/invoice schemas - inventory: 3 services, 2 models, 30+ schemas - customers: 3 services, 2 models, customer schemas - messaging: 3 services, 2 models, message/notification schemas - monitoring: background_tasks_service - marketplace: 5+ services including letzshop submodule - dev_tools: code_quality_service, test_runner_service - billing: billing_service - contracts: definition.py Legacy files in app/services/, models/database/, models/schema/ now re-export from canonical module locations for backwards compatibility. Architecture validator passes with 0 errors. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
820
app/modules/dev_tools/services/code_quality_service.py
Normal file
820
app/modules/dev_tools/services/code_quality_service.py
Normal file
@@ -0,0 +1,820 @@
|
||||
"""
|
||||
Code Quality Service
|
||||
Business logic for managing code quality scans and violations
|
||||
Supports multiple validator types: architecture, security, performance
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import subprocess
|
||||
from datetime import datetime, UTC
|
||||
|
||||
from sqlalchemy import desc, func
|
||||
from sqlalchemy.orm import Session
|
||||
|
||||
from app.exceptions import (
|
||||
ScanParseException,
|
||||
ScanTimeoutException,
|
||||
ViolationNotFoundException,
|
||||
)
|
||||
from app.modules.dev_tools.models import (
|
||||
ArchitectureScan,
|
||||
ArchitectureViolation,
|
||||
ViolationAssignment,
|
||||
ViolationComment,
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Validator type constants
|
||||
VALIDATOR_ARCHITECTURE = "architecture"
|
||||
VALIDATOR_SECURITY = "security"
|
||||
VALIDATOR_PERFORMANCE = "performance"
|
||||
|
||||
VALID_VALIDATOR_TYPES = [VALIDATOR_ARCHITECTURE, VALIDATOR_SECURITY, VALIDATOR_PERFORMANCE]
|
||||
|
||||
# Map validator types to their scripts
|
||||
VALIDATOR_SCRIPTS = {
|
||||
VALIDATOR_ARCHITECTURE: "scripts/validate_architecture.py",
|
||||
VALIDATOR_SECURITY: "scripts/validate_security.py",
|
||||
VALIDATOR_PERFORMANCE: "scripts/validate_performance.py",
|
||||
}
|
||||
|
||||
# Human-readable names
|
||||
VALIDATOR_NAMES = {
|
||||
VALIDATOR_ARCHITECTURE: "Architecture",
|
||||
VALIDATOR_SECURITY: "Security",
|
||||
VALIDATOR_PERFORMANCE: "Performance",
|
||||
}
|
||||
|
||||
|
||||
class CodeQualityService:
|
||||
"""Service for managing code quality scans and violations"""
|
||||
|
||||
def run_scan(
|
||||
self,
|
||||
db: Session,
|
||||
triggered_by: str = "manual",
|
||||
validator_type: str = VALIDATOR_ARCHITECTURE,
|
||||
) -> ArchitectureScan:
|
||||
"""
|
||||
Run a code quality validator and store results in database
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd')
|
||||
validator_type: Type of validator ('architecture', 'security', 'performance')
|
||||
|
||||
Returns:
|
||||
ArchitectureScan object with results
|
||||
|
||||
Raises:
|
||||
ValueError: If validator_type is invalid
|
||||
ScanTimeoutException: If validator times out
|
||||
ScanParseException: If validator output cannot be parsed
|
||||
"""
|
||||
if validator_type not in VALID_VALIDATOR_TYPES:
|
||||
raise ValueError(
|
||||
f"Invalid validator type: {validator_type}. "
|
||||
f"Must be one of: {VALID_VALIDATOR_TYPES}"
|
||||
)
|
||||
|
||||
script_path = VALIDATOR_SCRIPTS[validator_type]
|
||||
validator_name = VALIDATOR_NAMES[validator_type]
|
||||
|
||||
logger.info(
|
||||
f"Starting {validator_name} scan (triggered by: {triggered_by})"
|
||||
)
|
||||
|
||||
# Get git commit hash
|
||||
git_commit = self._get_git_commit_hash()
|
||||
|
||||
# Run validator with JSON output
|
||||
start_time = datetime.now()
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["python", script_path, "--json"],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=300, # 5 minute timeout
|
||||
)
|
||||
except subprocess.TimeoutExpired:
|
||||
logger.error(f"{validator_name} scan timed out after 5 minutes")
|
||||
raise ScanTimeoutException(timeout_seconds=300)
|
||||
|
||||
duration = (datetime.now() - start_time).total_seconds()
|
||||
|
||||
# Parse JSON output (get only the JSON part, skip progress messages)
|
||||
try:
|
||||
# Find the JSON part in stdout
|
||||
lines = result.stdout.strip().split("\n")
|
||||
json_start = -1
|
||||
for i, line in enumerate(lines):
|
||||
if line.strip().startswith("{"):
|
||||
json_start = i
|
||||
break
|
||||
|
||||
if json_start == -1:
|
||||
raise ValueError("No JSON output found")
|
||||
|
||||
json_output = "\n".join(lines[json_start:])
|
||||
data = json.loads(json_output)
|
||||
except (json.JSONDecodeError, ValueError) as e:
|
||||
logger.error(f"Failed to parse {validator_name} validator output: {e}")
|
||||
logger.error(f"Stdout: {result.stdout}")
|
||||
logger.error(f"Stderr: {result.stderr}")
|
||||
raise ScanParseException(reason=str(e))
|
||||
|
||||
# Create scan record
|
||||
scan = ArchitectureScan(
|
||||
timestamp=datetime.now(),
|
||||
validator_type=validator_type,
|
||||
total_files=data.get("files_checked", 0),
|
||||
total_violations=data.get("total_violations", 0),
|
||||
errors=data.get("errors", 0),
|
||||
warnings=data.get("warnings", 0),
|
||||
duration_seconds=duration,
|
||||
triggered_by=triggered_by,
|
||||
git_commit_hash=git_commit,
|
||||
)
|
||||
db.add(scan)
|
||||
db.flush() # Get scan.id
|
||||
|
||||
# Create violation records
|
||||
violations_data = data.get("violations", [])
|
||||
logger.info(f"Creating {len(violations_data)} {validator_name} violation records")
|
||||
|
||||
for v in violations_data:
|
||||
violation = ArchitectureViolation(
|
||||
scan_id=scan.id,
|
||||
validator_type=validator_type,
|
||||
rule_id=v["rule_id"],
|
||||
rule_name=v["rule_name"],
|
||||
severity=v["severity"],
|
||||
file_path=v["file_path"],
|
||||
line_number=v["line_number"],
|
||||
message=v["message"],
|
||||
context=v.get("context", ""),
|
||||
suggestion=v.get("suggestion", ""),
|
||||
status="open",
|
||||
)
|
||||
db.add(violation)
|
||||
|
||||
db.flush()
|
||||
db.refresh(scan)
|
||||
|
||||
logger.info(
|
||||
f"{validator_name} scan completed: {scan.total_violations} violations found"
|
||||
)
|
||||
return scan
|
||||
|
||||
def run_all_scans(
|
||||
self, db: Session, triggered_by: str = "manual"
|
||||
) -> list[ArchitectureScan]:
|
||||
"""
|
||||
Run all validators and return list of scans
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
triggered_by: Who/what triggered the scan
|
||||
|
||||
Returns:
|
||||
List of ArchitectureScan objects (one per validator)
|
||||
"""
|
||||
results = []
|
||||
for validator_type in VALID_VALIDATOR_TYPES:
|
||||
try:
|
||||
scan = self.run_scan(db, triggered_by, validator_type)
|
||||
results.append(scan)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to run {validator_type} scan: {e}")
|
||||
# Continue with other validators even if one fails
|
||||
return results
|
||||
|
||||
def get_latest_scan(
|
||||
self, db: Session, validator_type: str = None
|
||||
) -> ArchitectureScan | None:
|
||||
"""
|
||||
Get the most recent scan
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
validator_type: Optional filter by validator type
|
||||
|
||||
Returns:
|
||||
Most recent ArchitectureScan or None
|
||||
"""
|
||||
query = db.query(ArchitectureScan).order_by(desc(ArchitectureScan.timestamp))
|
||||
|
||||
if validator_type:
|
||||
query = query.filter(ArchitectureScan.validator_type == validator_type)
|
||||
|
||||
return query.first()
|
||||
|
||||
def get_latest_scans_by_type(self, db: Session) -> dict[str, ArchitectureScan]:
|
||||
"""
|
||||
Get the most recent scan for each validator type
|
||||
|
||||
Returns:
|
||||
Dictionary mapping validator_type to its latest scan
|
||||
"""
|
||||
result = {}
|
||||
for vtype in VALID_VALIDATOR_TYPES:
|
||||
scan = self.get_latest_scan(db, validator_type=vtype)
|
||||
if scan:
|
||||
result[vtype] = scan
|
||||
return result
|
||||
|
||||
def get_scan_by_id(self, db: Session, scan_id: int) -> ArchitectureScan | None:
|
||||
"""Get scan by ID"""
|
||||
return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
|
||||
|
||||
def create_pending_scan(
|
||||
self, db: Session, validator_type: str, triggered_by: str
|
||||
) -> ArchitectureScan:
|
||||
"""
|
||||
Create a new scan record with pending status.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
validator_type: Type of validator (architecture, security, performance)
|
||||
triggered_by: Who triggered the scan (e.g., "manual:username")
|
||||
|
||||
Returns:
|
||||
The created ArchitectureScan record with ID populated
|
||||
"""
|
||||
scan = ArchitectureScan(
|
||||
timestamp=datetime.now(UTC),
|
||||
validator_type=validator_type,
|
||||
status="pending",
|
||||
triggered_by=triggered_by,
|
||||
)
|
||||
db.add(scan)
|
||||
db.flush() # Get scan.id
|
||||
return scan
|
||||
|
||||
def get_running_scans(self, db: Session) -> list[ArchitectureScan]:
|
||||
"""
|
||||
Get all currently running scans (pending or running status).
|
||||
|
||||
Returns:
|
||||
List of scans with status 'pending' or 'running', newest first
|
||||
"""
|
||||
return (
|
||||
db.query(ArchitectureScan)
|
||||
.filter(ArchitectureScan.status.in_(["pending", "running"]))
|
||||
.order_by(ArchitectureScan.timestamp.desc())
|
||||
.all()
|
||||
)
|
||||
|
||||
def get_scan_history(
|
||||
self, db: Session, limit: int = 30, validator_type: str = None
|
||||
) -> list[ArchitectureScan]:
|
||||
"""
|
||||
Get scan history for trend graphs
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
limit: Maximum number of scans to return
|
||||
validator_type: Optional filter by validator type
|
||||
|
||||
Returns:
|
||||
List of ArchitectureScan objects, newest first
|
||||
"""
|
||||
query = db.query(ArchitectureScan).order_by(desc(ArchitectureScan.timestamp))
|
||||
|
||||
if validator_type:
|
||||
query = query.filter(ArchitectureScan.validator_type == validator_type)
|
||||
|
||||
return query.limit(limit).all()
|
||||
|
||||
def get_violations(
|
||||
self,
|
||||
db: Session,
|
||||
scan_id: int = None,
|
||||
validator_type: str = None,
|
||||
severity: str = None,
|
||||
status: str = None,
|
||||
rule_id: str = None,
|
||||
file_path: str = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> tuple[list[ArchitectureViolation], int]:
|
||||
"""
|
||||
Get violations with filtering and pagination
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
scan_id: Filter by scan ID (if None, use latest scan(s))
|
||||
validator_type: Filter by validator type
|
||||
severity: Filter by severity ('error', 'warning')
|
||||
status: Filter by status ('open', 'assigned', 'resolved', etc.)
|
||||
rule_id: Filter by rule ID
|
||||
file_path: Filter by file path (partial match)
|
||||
limit: Page size
|
||||
offset: Page offset
|
||||
|
||||
Returns:
|
||||
Tuple of (violations list, total count)
|
||||
"""
|
||||
# Build query
|
||||
query = db.query(ArchitectureViolation)
|
||||
|
||||
# If scan_id specified, filter by it
|
||||
if scan_id is not None:
|
||||
query = query.filter(ArchitectureViolation.scan_id == scan_id)
|
||||
else:
|
||||
# If no scan_id, get violations from latest scan(s)
|
||||
if validator_type:
|
||||
# Get latest scan for specific validator type
|
||||
latest_scan = self.get_latest_scan(db, validator_type)
|
||||
if not latest_scan:
|
||||
return [], 0
|
||||
query = query.filter(ArchitectureViolation.scan_id == latest_scan.id)
|
||||
else:
|
||||
# Get violations from latest scans of all types
|
||||
latest_scans = self.get_latest_scans_by_type(db)
|
||||
if not latest_scans:
|
||||
return [], 0
|
||||
scan_ids = [s.id for s in latest_scans.values()]
|
||||
query = query.filter(ArchitectureViolation.scan_id.in_(scan_ids))
|
||||
|
||||
# Apply validator_type filter if specified (for scan_id queries)
|
||||
if validator_type and scan_id is not None:
|
||||
query = query.filter(ArchitectureViolation.validator_type == validator_type)
|
||||
|
||||
# Apply other filters
|
||||
if severity:
|
||||
query = query.filter(ArchitectureViolation.severity == severity)
|
||||
|
||||
if status:
|
||||
query = query.filter(ArchitectureViolation.status == status)
|
||||
|
||||
if rule_id:
|
||||
query = query.filter(ArchitectureViolation.rule_id == rule_id)
|
||||
|
||||
if file_path:
|
||||
query = query.filter(ArchitectureViolation.file_path.like(f"%{file_path}%"))
|
||||
|
||||
# Get total count
|
||||
total = query.count()
|
||||
|
||||
# Get page of results
|
||||
violations = (
|
||||
query.order_by(
|
||||
ArchitectureViolation.severity.desc(),
|
||||
ArchitectureViolation.validator_type,
|
||||
ArchitectureViolation.file_path,
|
||||
)
|
||||
.limit(limit)
|
||||
.offset(offset)
|
||||
.all()
|
||||
)
|
||||
|
||||
return violations, total
|
||||
|
||||
def get_violation_by_id(
|
||||
self, db: Session, violation_id: int
|
||||
) -> ArchitectureViolation | None:
|
||||
"""Get single violation with details"""
|
||||
return (
|
||||
db.query(ArchitectureViolation)
|
||||
.filter(ArchitectureViolation.id == violation_id)
|
||||
.first()
|
||||
)
|
||||
|
||||
def assign_violation(
|
||||
self,
|
||||
db: Session,
|
||||
violation_id: int,
|
||||
user_id: int,
|
||||
assigned_by: int,
|
||||
due_date: datetime = None,
|
||||
priority: str = "medium",
|
||||
) -> ViolationAssignment:
|
||||
"""
|
||||
Assign violation to a developer
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
violation_id: Violation ID
|
||||
user_id: User to assign to
|
||||
assigned_by: User who is assigning
|
||||
due_date: Due date (optional)
|
||||
priority: Priority level ('low', 'medium', 'high', 'critical')
|
||||
|
||||
Returns:
|
||||
ViolationAssignment object
|
||||
"""
|
||||
# Update violation status
|
||||
violation = self.get_violation_by_id(db, violation_id)
|
||||
if violation:
|
||||
violation.status = "assigned"
|
||||
violation.assigned_to = user_id
|
||||
|
||||
# Create assignment record
|
||||
assignment = ViolationAssignment(
|
||||
violation_id=violation_id,
|
||||
user_id=user_id,
|
||||
assigned_by=assigned_by,
|
||||
due_date=due_date,
|
||||
priority=priority,
|
||||
)
|
||||
db.add(assignment)
|
||||
db.flush()
|
||||
|
||||
logger.info(f"Violation {violation_id} assigned to user {user_id}")
|
||||
return assignment
|
||||
|
||||
def resolve_violation(
|
||||
self, db: Session, violation_id: int, resolved_by: int, resolution_note: str
|
||||
) -> ArchitectureViolation:
|
||||
"""
|
||||
Mark violation as resolved
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
violation_id: Violation ID
|
||||
resolved_by: User who resolved it
|
||||
resolution_note: Note about resolution
|
||||
|
||||
Returns:
|
||||
Updated ArchitectureViolation object
|
||||
"""
|
||||
violation = self.get_violation_by_id(db, violation_id)
|
||||
if not violation:
|
||||
raise ViolationNotFoundException(violation_id)
|
||||
|
||||
violation.status = "resolved"
|
||||
violation.resolved_at = datetime.now()
|
||||
violation.resolved_by = resolved_by
|
||||
violation.resolution_note = resolution_note
|
||||
|
||||
db.flush()
|
||||
logger.info(f"Violation {violation_id} resolved by user {resolved_by}")
|
||||
return violation
|
||||
|
||||
def ignore_violation(
|
||||
self, db: Session, violation_id: int, ignored_by: int, reason: str
|
||||
) -> ArchitectureViolation:
|
||||
"""
|
||||
Mark violation as ignored/won't fix
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
violation_id: Violation ID
|
||||
ignored_by: User who ignored it
|
||||
reason: Reason for ignoring
|
||||
|
||||
Returns:
|
||||
Updated ArchitectureViolation object
|
||||
"""
|
||||
violation = self.get_violation_by_id(db, violation_id)
|
||||
if not violation:
|
||||
raise ViolationNotFoundException(violation_id)
|
||||
|
||||
violation.status = "ignored"
|
||||
violation.resolved_at = datetime.now()
|
||||
violation.resolved_by = ignored_by
|
||||
violation.resolution_note = f"Ignored: {reason}"
|
||||
|
||||
db.flush()
|
||||
logger.info(f"Violation {violation_id} ignored by user {ignored_by}")
|
||||
return violation
|
||||
|
||||
def add_comment(
|
||||
self, db: Session, violation_id: int, user_id: int, comment: str
|
||||
) -> ViolationComment:
|
||||
"""
|
||||
Add comment to violation
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
violation_id: Violation ID
|
||||
user_id: User posting comment
|
||||
comment: Comment text
|
||||
|
||||
Returns:
|
||||
ViolationComment object
|
||||
"""
|
||||
comment_obj = ViolationComment(
|
||||
violation_id=violation_id, user_id=user_id, comment=comment
|
||||
)
|
||||
db.add(comment_obj)
|
||||
db.flush()
|
||||
|
||||
logger.info(f"Comment added to violation {violation_id} by user {user_id}")
|
||||
return comment_obj
|
||||
|
||||
def get_dashboard_stats(
|
||||
self, db: Session, validator_type: str = None
|
||||
) -> dict:
|
||||
"""
|
||||
Get statistics for dashboard
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
validator_type: Optional filter by validator type. If None, returns combined stats.
|
||||
|
||||
Returns:
|
||||
Dictionary with various statistics including per-validator breakdown
|
||||
"""
|
||||
# Get latest scans by type
|
||||
latest_scans = self.get_latest_scans_by_type(db)
|
||||
|
||||
if not latest_scans:
|
||||
return self._empty_dashboard_stats()
|
||||
|
||||
# If specific validator type requested
|
||||
if validator_type and validator_type in latest_scans:
|
||||
scan = latest_scans[validator_type]
|
||||
return self._get_stats_for_scan(db, scan, validator_type)
|
||||
|
||||
# Combined stats across all validators
|
||||
return self._get_combined_stats(db, latest_scans)
|
||||
|
||||
def _empty_dashboard_stats(self) -> dict:
|
||||
"""Return empty dashboard stats structure"""
|
||||
return {
|
||||
"total_violations": 0,
|
||||
"errors": 0,
|
||||
"warnings": 0,
|
||||
"info": 0,
|
||||
"open": 0,
|
||||
"assigned": 0,
|
||||
"resolved": 0,
|
||||
"ignored": 0,
|
||||
"technical_debt_score": 100,
|
||||
"trend": [],
|
||||
"by_severity": {},
|
||||
"by_rule": {},
|
||||
"by_module": {},
|
||||
"top_files": [],
|
||||
"last_scan": None,
|
||||
"by_validator": {},
|
||||
}
|
||||
|
||||
def _get_stats_for_scan(
|
||||
self, db: Session, scan: ArchitectureScan, validator_type: str
|
||||
) -> dict:
|
||||
"""Get stats for a single scan/validator type"""
|
||||
# Get violation counts by status
|
||||
status_counts = (
|
||||
db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
|
||||
.filter(ArchitectureViolation.scan_id == scan.id)
|
||||
.group_by(ArchitectureViolation.status)
|
||||
.all()
|
||||
)
|
||||
status_dict = {status: count for status, count in status_counts}
|
||||
|
||||
# Get violations by severity
|
||||
severity_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id == scan.id)
|
||||
.group_by(ArchitectureViolation.severity)
|
||||
.all()
|
||||
)
|
||||
by_severity = {sev: count for sev, count in severity_counts}
|
||||
|
||||
# Get violations by rule
|
||||
rule_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id == scan.id)
|
||||
.group_by(ArchitectureViolation.rule_id)
|
||||
.all()
|
||||
)
|
||||
by_rule = {
|
||||
rule: count
|
||||
for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[:10]
|
||||
}
|
||||
|
||||
# Get top violating files
|
||||
file_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.file_path,
|
||||
func.count(ArchitectureViolation.id).label("count"),
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id == scan.id)
|
||||
.group_by(ArchitectureViolation.file_path)
|
||||
.order_by(desc("count"))
|
||||
.limit(10)
|
||||
.all()
|
||||
)
|
||||
top_files = [{"file": file, "count": count} for file, count in file_counts]
|
||||
|
||||
# Get violations by module
|
||||
by_module = self._get_violations_by_module(db, scan.id)
|
||||
|
||||
# Get trend for this validator type
|
||||
trend_scans = self.get_scan_history(db, limit=7, validator_type=validator_type)
|
||||
trend = [
|
||||
{
|
||||
"timestamp": s.timestamp.isoformat(),
|
||||
"violations": s.total_violations,
|
||||
"errors": s.errors,
|
||||
"warnings": s.warnings,
|
||||
}
|
||||
for s in reversed(trend_scans)
|
||||
]
|
||||
|
||||
return {
|
||||
"total_violations": scan.total_violations,
|
||||
"errors": scan.errors,
|
||||
"warnings": scan.warnings,
|
||||
"info": by_severity.get("info", 0),
|
||||
"open": status_dict.get("open", 0),
|
||||
"assigned": status_dict.get("assigned", 0),
|
||||
"resolved": status_dict.get("resolved", 0),
|
||||
"ignored": status_dict.get("ignored", 0),
|
||||
"technical_debt_score": self._calculate_score(scan.errors, scan.warnings),
|
||||
"trend": trend,
|
||||
"by_severity": by_severity,
|
||||
"by_rule": by_rule,
|
||||
"by_module": by_module,
|
||||
"top_files": top_files,
|
||||
"last_scan": scan.timestamp.isoformat(),
|
||||
"validator_type": validator_type,
|
||||
"by_validator": {
|
||||
validator_type: {
|
||||
"total_violations": scan.total_violations,
|
||||
"errors": scan.errors,
|
||||
"warnings": scan.warnings,
|
||||
"last_scan": scan.timestamp.isoformat(),
|
||||
}
|
||||
},
|
||||
}
|
||||
|
||||
def _get_combined_stats(
|
||||
self, db: Session, latest_scans: dict[str, ArchitectureScan]
|
||||
) -> dict:
|
||||
"""Get combined stats across all validators"""
|
||||
# Aggregate totals
|
||||
total_violations = sum(s.total_violations for s in latest_scans.values())
|
||||
total_errors = sum(s.errors for s in latest_scans.values())
|
||||
total_warnings = sum(s.warnings for s in latest_scans.values())
|
||||
|
||||
# Get all scan IDs
|
||||
scan_ids = [s.id for s in latest_scans.values()]
|
||||
|
||||
# Get violation counts by status
|
||||
status_counts = (
|
||||
db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
|
||||
.filter(ArchitectureViolation.scan_id.in_(scan_ids))
|
||||
.group_by(ArchitectureViolation.status)
|
||||
.all()
|
||||
)
|
||||
status_dict = {status: count for status, count in status_counts}
|
||||
|
||||
# Get violations by severity
|
||||
severity_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id.in_(scan_ids))
|
||||
.group_by(ArchitectureViolation.severity)
|
||||
.all()
|
||||
)
|
||||
by_severity = {sev: count for sev, count in severity_counts}
|
||||
|
||||
# Get violations by rule (across all validators)
|
||||
rule_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id.in_(scan_ids))
|
||||
.group_by(ArchitectureViolation.rule_id)
|
||||
.all()
|
||||
)
|
||||
by_rule = {
|
||||
rule: count
|
||||
for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[:10]
|
||||
}
|
||||
|
||||
# Get top violating files
|
||||
file_counts = (
|
||||
db.query(
|
||||
ArchitectureViolation.file_path,
|
||||
func.count(ArchitectureViolation.id).label("count"),
|
||||
)
|
||||
.filter(ArchitectureViolation.scan_id.in_(scan_ids))
|
||||
.group_by(ArchitectureViolation.file_path)
|
||||
.order_by(desc("count"))
|
||||
.limit(10)
|
||||
.all()
|
||||
)
|
||||
top_files = [{"file": file, "count": count} for file, count in file_counts]
|
||||
|
||||
# Get violations by module
|
||||
by_module = {}
|
||||
for scan_id in scan_ids:
|
||||
module_counts = self._get_violations_by_module(db, scan_id)
|
||||
for module, count in module_counts.items():
|
||||
by_module[module] = by_module.get(module, 0) + count
|
||||
by_module = dict(
|
||||
sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10]
|
||||
)
|
||||
|
||||
# Per-validator breakdown
|
||||
by_validator = {}
|
||||
for vtype, scan in latest_scans.items():
|
||||
by_validator[vtype] = {
|
||||
"total_violations": scan.total_violations,
|
||||
"errors": scan.errors,
|
||||
"warnings": scan.warnings,
|
||||
"last_scan": scan.timestamp.isoformat(),
|
||||
}
|
||||
|
||||
# Get most recent scan timestamp
|
||||
most_recent = max(latest_scans.values(), key=lambda s: s.timestamp)
|
||||
|
||||
return {
|
||||
"total_violations": total_violations,
|
||||
"errors": total_errors,
|
||||
"warnings": total_warnings,
|
||||
"info": by_severity.get("info", 0),
|
||||
"open": status_dict.get("open", 0),
|
||||
"assigned": status_dict.get("assigned", 0),
|
||||
"resolved": status_dict.get("resolved", 0),
|
||||
"ignored": status_dict.get("ignored", 0),
|
||||
"technical_debt_score": self._calculate_score(total_errors, total_warnings),
|
||||
"trend": [], # Combined trend would need special handling
|
||||
"by_severity": by_severity,
|
||||
"by_rule": by_rule,
|
||||
"by_module": by_module,
|
||||
"top_files": top_files,
|
||||
"last_scan": most_recent.timestamp.isoformat(),
|
||||
"by_validator": by_validator,
|
||||
}
|
||||
|
||||
def _get_violations_by_module(self, db: Session, scan_id: int) -> dict[str, int]:
|
||||
"""Extract module from file paths and count violations"""
|
||||
by_module = {}
|
||||
violations = (
|
||||
db.query(ArchitectureViolation.file_path)
|
||||
.filter(ArchitectureViolation.scan_id == scan_id)
|
||||
.all()
|
||||
)
|
||||
|
||||
for v in violations:
|
||||
path_parts = v.file_path.split("/")
|
||||
if len(path_parts) >= 2:
|
||||
module = "/".join(path_parts[:2])
|
||||
else:
|
||||
module = path_parts[0]
|
||||
by_module[module] = by_module.get(module, 0) + 1
|
||||
|
||||
return dict(sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10])
|
||||
|
||||
def _calculate_score(self, errors: int, warnings: int) -> int:
|
||||
"""Calculate technical debt score (0-100)"""
|
||||
score = 100 - (errors * 0.5 + warnings * 0.05)
|
||||
return max(0, min(100, int(score)))
|
||||
|
||||
def calculate_technical_debt_score(
|
||||
self, db: Session, scan_id: int = None, validator_type: str = None
|
||||
) -> int:
|
||||
"""
|
||||
Calculate technical debt score (0-100)
|
||||
|
||||
Formula: 100 - (errors * 0.5 + warnings * 0.05)
|
||||
Capped at 0 minimum
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
scan_id: Scan ID (if None, use latest)
|
||||
validator_type: Filter by validator type
|
||||
|
||||
Returns:
|
||||
Score from 0-100
|
||||
"""
|
||||
if scan_id is None:
|
||||
latest_scan = self.get_latest_scan(db, validator_type)
|
||||
if not latest_scan:
|
||||
return 100
|
||||
scan_id = latest_scan.id
|
||||
|
||||
scan = self.get_scan_by_id(db, scan_id)
|
||||
if not scan:
|
||||
return 100
|
||||
|
||||
return self._calculate_score(scan.errors, scan.warnings)
|
||||
|
||||
def _get_git_commit_hash(self) -> str | None:
|
||||
"""Get current git commit hash"""
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["git", "rev-parse", "HEAD"], capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
return result.stdout.strip()[:40]
|
||||
except Exception:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# Singleton instance
|
||||
code_quality_service = CodeQualityService()
|
||||
Reference in New Issue
Block a user