Files
orion/app/api/v1/admin/code_quality.py
Samir Boulahtit d50b154823 fix: resolve architecture validation errors and warnings
- Move database operations from API to service layer:
  - Add create_pending_scan() method to code_quality_service
  - Add get_running_scans() method to code_quality_service
  - Update API to use service methods instead of direct db queries
- Replace inline SVG spinner with $icon('spinner', ...) helper
- Add comment documenting custom dropdown exception

Validation now passes with 0 errors.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-21 21:07:18 +01:00

610 lines
17 KiB
Python

"""
Code Quality API Endpoints
RESTful API for code quality validation and violation management
Supports multiple validator types: architecture, security, performance
"""
from datetime import datetime
from enum import Enum
from fastapi import APIRouter, BackgroundTasks, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.exceptions import ScanNotFoundException, ViolationNotFoundException
from app.services.code_quality_service import (
VALID_VALIDATOR_TYPES,
code_quality_service,
)
from app.tasks.code_quality_tasks import execute_code_quality_scan
from models.database.architecture_scan import ArchitectureScan
from models.database.user import User
from models.schema.stats import CodeQualityDashboardStatsResponse
router = APIRouter()
# Enums and Constants
class ValidatorType(str, Enum):
"""Supported validator types"""
ARCHITECTURE = "architecture"
SECURITY = "security"
PERFORMANCE = "performance"
# Pydantic Models for API
class ScanResponse(BaseModel):
"""Response model for a scan"""
id: int
timestamp: str
validator_type: str
status: str
started_at: str | None
completed_at: str | None
progress_message: str | None
total_files: int
total_violations: int
errors: int
warnings: int
duration_seconds: float
triggered_by: str | None
git_commit_hash: str | None
error_message: str | None = None
class Config:
from_attributes = True
class ScanRequest(BaseModel):
"""Request model for triggering scans"""
validator_types: list[ValidatorType] = Field(
default=[ValidatorType.ARCHITECTURE, ValidatorType.SECURITY, ValidatorType.PERFORMANCE],
description="Validator types to run",
)
class ScanJobResponse(BaseModel):
"""Response model for a queued scan job"""
id: int
validator_type: str
status: str
message: str
class MultiScanJobResponse(BaseModel):
"""Response model for multiple queued scans (background task pattern)"""
scans: list[ScanJobResponse]
message: str
status_url: str
class MultiScanResponse(BaseModel):
"""Response model for completed scans (legacy sync pattern)"""
scans: list[ScanResponse]
total_violations: int
total_errors: int
total_warnings: int
class ViolationResponse(BaseModel):
"""Response model for a violation"""
id: int
scan_id: int
validator_type: str
rule_id: str
rule_name: str
severity: str
file_path: str
line_number: int
message: str
context: str | None
suggestion: str | None
status: str
assigned_to: int | None
resolved_at: str | None
resolved_by: int | None
resolution_note: str | None
created_at: str
class Config:
from_attributes = True
class ViolationListResponse(BaseModel):
"""Response model for paginated violations list"""
violations: list[ViolationResponse]
total: int
page: int
page_size: int
total_pages: int
class ViolationDetailResponse(ViolationResponse):
"""Response model for single violation with relationships"""
assignments: list = []
comments: list = []
class AssignViolationRequest(BaseModel):
"""Request model for assigning a violation"""
user_id: int = Field(..., description="User ID to assign to")
due_date: datetime | None = Field(None, description="Due date for resolution")
priority: str = Field(
"medium", description="Priority level (low, medium, high, critical)"
)
class ResolveViolationRequest(BaseModel):
"""Request model for resolving a violation"""
resolution_note: str = Field(..., description="Note about the resolution")
class IgnoreViolationRequest(BaseModel):
"""Request model for ignoring a violation"""
reason: str = Field(..., description="Reason for ignoring")
class AddCommentRequest(BaseModel):
"""Request model for adding a comment"""
comment: str = Field(..., min_length=1, description="Comment text")
# API Endpoints
def _scan_to_response(scan: ArchitectureScan) -> ScanResponse:
"""Convert ArchitectureScan to ScanResponse."""
return ScanResponse(
id=scan.id,
timestamp=scan.timestamp.isoformat() if scan.timestamp else None,
validator_type=scan.validator_type,
status=scan.status,
started_at=scan.started_at.isoformat() if scan.started_at else None,
completed_at=scan.completed_at.isoformat() if scan.completed_at else None,
progress_message=scan.progress_message,
total_files=scan.total_files or 0,
total_violations=scan.total_violations or 0,
errors=scan.errors or 0,
warnings=scan.warnings or 0,
duration_seconds=scan.duration_seconds or 0.0,
triggered_by=scan.triggered_by,
git_commit_hash=scan.git_commit_hash,
error_message=scan.error_message,
)
@router.post("/scan", response_model=MultiScanJobResponse, status_code=202)
async def trigger_scan(
request: ScanRequest = None,
background_tasks: BackgroundTasks = None,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Trigger code quality scan(s) as background tasks.
By default runs all validators. Specify validator_types to run specific validators.
Returns immediately with job IDs. Poll /scan/{scan_id}/status for progress.
Scans run asynchronously - users can browse other pages while scans execute.
"""
if request is None:
request = ScanRequest()
scan_jobs = []
triggered_by = f"manual:{current_user.username}"
for vtype in request.validator_types:
# Create scan record with pending status via service
scan = code_quality_service.create_pending_scan(
db, validator_type=vtype.value, triggered_by=triggered_by
)
# Queue background task
background_tasks.add_task(execute_code_quality_scan, scan.id)
scan_jobs.append(
ScanJobResponse(
id=scan.id,
validator_type=vtype.value,
status="pending",
message=f"{vtype.value.capitalize()} scan queued",
)
)
db.commit()
validator_names = ", ".join(vtype.value for vtype in request.validator_types)
return MultiScanJobResponse(
scans=scan_jobs,
message=f"Scans queued for: {validator_names}",
status_url="/admin/code-quality/scans/running",
)
@router.get("/scans/{scan_id}/status", response_model=ScanResponse)
async def get_scan_status(
scan_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get status of a specific scan.
Use this endpoint to poll for scan completion.
"""
scan = code_quality_service.get_scan_by_id(db, scan_id)
if not scan:
raise ScanNotFoundException(scan_id)
return _scan_to_response(scan)
@router.get("/scans/running", response_model=list[ScanResponse])
async def get_running_scans(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get all currently running scans.
Returns scans with status 'pending' or 'running'.
"""
scans = code_quality_service.get_running_scans(db)
return [_scan_to_response(scan) for scan in scans]
@router.get("/scans", response_model=list[ScanResponse])
async def list_scans(
limit: int = Query(30, ge=1, le=100, description="Number of scans to return"),
validator_type: ValidatorType | None = Query(
None, description="Filter by validator type"
),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get scan history
Returns recent scans for trend analysis.
Optionally filter by validator type.
"""
scans = code_quality_service.get_scan_history(
db, limit=limit, validator_type=validator_type.value if validator_type else None
)
return [_scan_to_response(scan) for scan in scans]
@router.get("/violations", response_model=ViolationListResponse)
async def list_violations(
scan_id: int | None = Query(
None, description="Filter by scan ID (defaults to latest)"
),
validator_type: ValidatorType | None = Query(
None, description="Filter by validator type"
),
severity: str | None = Query(
None, description="Filter by severity (error, warning, info)"
),
status: str | None = Query(
None, description="Filter by status (open, assigned, resolved, ignored)"
),
rule_id: str | None = Query(None, description="Filter by rule ID"),
file_path: str | None = Query(
None, description="Filter by file path (partial match)"
),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(50, ge=1, le=200, description="Items per page"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get violations with filtering and pagination
Returns violations from latest scan(s) by default.
Filter by validator_type to get violations from a specific validator.
"""
offset = (page - 1) * page_size
violations, total = code_quality_service.get_violations(
db,
scan_id=scan_id,
validator_type=validator_type.value if validator_type else None,
severity=severity,
status=status,
rule_id=rule_id,
file_path=file_path,
limit=page_size,
offset=offset,
)
total_pages = (total + page_size - 1) // page_size
return ViolationListResponse(
violations=[
ViolationResponse(
id=v.id,
scan_id=v.scan_id,
validator_type=v.validator_type,
rule_id=v.rule_id,
rule_name=v.rule_name,
severity=v.severity,
file_path=v.file_path,
line_number=v.line_number,
message=v.message,
context=v.context,
suggestion=v.suggestion,
status=v.status,
assigned_to=v.assigned_to,
resolved_at=v.resolved_at.isoformat() if v.resolved_at else None,
resolved_by=v.resolved_by,
resolution_note=v.resolution_note,
created_at=v.created_at.isoformat(),
)
for v in violations
],
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/violations/{violation_id}", response_model=ViolationDetailResponse)
async def get_violation(
violation_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get single violation with details
Includes assignments and comments.
"""
violation = code_quality_service.get_violation_by_id(db, violation_id)
if not violation:
raise ViolationNotFoundException(violation_id)
# Format assignments
assignments = [
{
"id": a.id,
"user_id": a.user_id,
"assigned_at": a.assigned_at.isoformat(),
"assigned_by": a.assigned_by,
"due_date": a.due_date.isoformat() if a.due_date else None,
"priority": a.priority,
}
for a in violation.assignments
]
# Format comments
comments = [
{
"id": c.id,
"user_id": c.user_id,
"comment": c.comment,
"created_at": c.created_at.isoformat(),
}
for c in violation.comments
]
return ViolationDetailResponse(
id=violation.id,
scan_id=violation.scan_id,
validator_type=violation.validator_type,
rule_id=violation.rule_id,
rule_name=violation.rule_name,
severity=violation.severity,
file_path=violation.file_path,
line_number=violation.line_number,
message=violation.message,
context=violation.context,
suggestion=violation.suggestion,
status=violation.status,
assigned_to=violation.assigned_to,
resolved_at=(
violation.resolved_at.isoformat() if violation.resolved_at else None
),
resolved_by=violation.resolved_by,
resolution_note=violation.resolution_note,
created_at=violation.created_at.isoformat(),
assignments=assignments,
comments=comments,
)
@router.post("/violations/{violation_id}/assign")
async def assign_violation(
violation_id: int,
request: AssignViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Assign violation to a developer
Updates violation status to 'assigned'.
"""
assignment = code_quality_service.assign_violation(
db,
violation_id=violation_id,
user_id=request.user_id,
assigned_by=current_user.id,
due_date=request.due_date,
priority=request.priority,
)
db.commit()
return {
"id": assignment.id,
"violation_id": assignment.violation_id,
"user_id": assignment.user_id,
"assigned_at": assignment.assigned_at.isoformat(),
"assigned_by": assignment.assigned_by,
"due_date": (assignment.due_date.isoformat() if assignment.due_date else None),
"priority": assignment.priority,
}
@router.post("/violations/{violation_id}/resolve")
async def resolve_violation(
violation_id: int,
request: ResolveViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Mark violation as resolved
Records resolution timestamp and user.
ViolationNotFoundException bubbles up if violation doesn't exist.
"""
violation = code_quality_service.resolve_violation(
db,
violation_id=violation_id,
resolved_by=current_user.id,
resolution_note=request.resolution_note,
)
db.commit()
return {
"id": violation.id,
"status": violation.status,
"resolved_at": (
violation.resolved_at.isoformat() if violation.resolved_at else None
),
"resolved_by": violation.resolved_by,
"resolution_note": violation.resolution_note,
}
@router.post("/violations/{violation_id}/ignore")
async def ignore_violation(
violation_id: int,
request: IgnoreViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Mark violation as ignored (won't fix)
Records reason for ignoring.
ViolationNotFoundException bubbles up if violation doesn't exist.
"""
violation = code_quality_service.ignore_violation(
db,
violation_id=violation_id,
ignored_by=current_user.id,
reason=request.reason,
)
db.commit()
return {
"id": violation.id,
"status": violation.status,
"resolved_at": (
violation.resolved_at.isoformat() if violation.resolved_at else None
),
"resolved_by": violation.resolved_by,
"resolution_note": violation.resolution_note,
}
@router.post("/violations/{violation_id}/comments")
async def add_comment(
violation_id: int,
request: AddCommentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Add comment to violation
For team collaboration and discussion.
"""
comment = code_quality_service.add_comment(
db,
violation_id=violation_id,
user_id=current_user.id,
comment=request.comment,
)
db.commit()
return {
"id": comment.id,
"violation_id": comment.violation_id,
"user_id": comment.user_id,
"comment": comment.comment,
"created_at": comment.created_at.isoformat(),
}
@router.get("/stats", response_model=CodeQualityDashboardStatsResponse)
async def get_dashboard_stats(
validator_type: ValidatorType | None = Query(
None, description="Filter by validator type (returns combined stats if not specified)"
),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get dashboard statistics
Returns comprehensive stats for the dashboard including:
- Total counts by severity and status
- Technical debt score
- Trend data (last 7 scans)
- Top violating files
- Violations by rule and module
- Per-validator breakdown
When validator_type is specified, returns stats for that type only.
When not specified, returns combined stats across all validators.
"""
stats = code_quality_service.get_dashboard_stats(
db, validator_type=validator_type.value if validator_type else None
)
return CodeQualityDashboardStatsResponse(**stats)
@router.get("/validator-types")
async def get_validator_types(
current_user: User = Depends(get_current_admin_api),
):
"""
Get list of available validator types
Returns the supported validator types for filtering.
"""
return {
"validator_types": VALID_VALIDATOR_TYPES,
"descriptions": {
"architecture": "Architectural patterns and code organization rules",
"security": "Security vulnerabilities and best practices",
"performance": "Performance issues and optimizations",
},
}