Files
orion/app/api/v1/admin/code_quality.py
Samir Boulahtit 22c4937779 feat: add proper Pydantic response_model to all stats endpoints
- Create comprehensive stats schemas in models/schema/stats.py:
  - ImportStatsResponse, UserStatsResponse, ProductStatsResponse
  - PlatformStatsResponse, AdminDashboardResponse
  - VendorDashboardStatsResponse with nested models
  - VendorAnalyticsResponse, CodeQualityDashboardStatsResponse
- Move DashboardStatsResponse from code_quality.py to schema file
- Fix get_vendor_statistics() to return pending_vendors field
- Fix get_vendor_stats() to return flat structure matching schema
- Add response_model to all stats endpoints:
  - GET /admin/dashboard -> AdminDashboardResponse
  - GET /admin/dashboard/stats/platform -> PlatformStatsResponse
  - GET /admin/marketplace-import-jobs/stats -> ImportStatsResponse
  - GET /vendor/dashboard/stats -> VendorDashboardStatsResponse
  - GET /vendor/analytics -> VendorAnalyticsResponse
- Enhance API-001 architecture rule with detailed guidance
- Add SVC-007 rule for service/schema compatibility

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-11 17:29:35 +01:00

449 lines
12 KiB
Python

"""
Code Quality API Endpoints
RESTful API for architecture validation and violation management
"""
from datetime import datetime
from fastapi import APIRouter, Depends, Query
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.exceptions import ViolationNotFoundException
from app.services.code_quality_service import code_quality_service
from models.database.user import User
from models.schema.stats import CodeQualityDashboardStatsResponse
router = APIRouter()
# Pydantic Models for API
class ScanResponse(BaseModel):
"""Response model for a scan"""
id: int
timestamp: str
total_files: int
total_violations: int
errors: int
warnings: int
duration_seconds: float
triggered_by: str
git_commit_hash: str | None
class Config:
from_attributes = True
class ViolationResponse(BaseModel):
"""Response model for a violation"""
id: int
scan_id: int
rule_id: str
rule_name: str
severity: str
file_path: str
line_number: int
message: str
context: str | None
suggestion: str | None
status: str
assigned_to: int | None
resolved_at: str | None
resolved_by: int | None
resolution_note: str | None
created_at: str
class Config:
from_attributes = True
class ViolationListResponse(BaseModel):
"""Response model for paginated violations list"""
violations: list[ViolationResponse]
total: int
page: int
page_size: int
total_pages: int
class ViolationDetailResponse(ViolationResponse):
"""Response model for single violation with relationships"""
assignments: list = []
comments: list = []
class AssignViolationRequest(BaseModel):
"""Request model for assigning a violation"""
user_id: int = Field(..., description="User ID to assign to")
due_date: datetime | None = Field(None, description="Due date for resolution")
priority: str = Field(
"medium", description="Priority level (low, medium, high, critical)"
)
class ResolveViolationRequest(BaseModel):
"""Request model for resolving a violation"""
resolution_note: str = Field(..., description="Note about the resolution")
class IgnoreViolationRequest(BaseModel):
"""Request model for ignoring a violation"""
reason: str = Field(..., description="Reason for ignoring")
class AddCommentRequest(BaseModel):
"""Request model for adding a comment"""
comment: str = Field(..., min_length=1, description="Comment text")
# API Endpoints
@router.post("/scan", response_model=ScanResponse)
async def trigger_scan(
db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_api)
):
"""
Trigger a new architecture scan
Requires authentication. Runs the validator script and stores results.
Domain exceptions (ScanTimeoutException, ScanParseException) bubble up to global handler.
"""
scan = code_quality_service.run_scan(
db, triggered_by=f"manual:{current_user.username}"
)
db.commit()
return ScanResponse(
id=scan.id,
timestamp=scan.timestamp.isoformat(),
total_files=scan.total_files,
total_violations=scan.total_violations,
errors=scan.errors,
warnings=scan.warnings,
duration_seconds=scan.duration_seconds,
triggered_by=scan.triggered_by,
git_commit_hash=scan.git_commit_hash,
)
@router.get("/scans", response_model=list[ScanResponse])
async def list_scans(
limit: int = Query(30, ge=1, le=100, description="Number of scans to return"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get scan history
Returns recent scans for trend analysis.
"""
scans = code_quality_service.get_scan_history(db, limit=limit)
return [
ScanResponse(
id=scan.id,
timestamp=scan.timestamp.isoformat(),
total_files=scan.total_files,
total_violations=scan.total_violations,
errors=scan.errors,
warnings=scan.warnings,
duration_seconds=scan.duration_seconds,
triggered_by=scan.triggered_by,
git_commit_hash=scan.git_commit_hash,
)
for scan in scans
]
@router.get("/violations", response_model=ViolationListResponse)
async def list_violations(
scan_id: int | None = Query(
None, description="Filter by scan ID (defaults to latest)"
),
severity: str | None = Query(
None, description="Filter by severity (error, warning)"
),
status: str | None = Query(
None, description="Filter by status (open, assigned, resolved, ignored)"
),
rule_id: str | None = Query(None, description="Filter by rule ID"),
file_path: str | None = Query(
None, description="Filter by file path (partial match)"
),
page: int = Query(1, ge=1, description="Page number"),
page_size: int = Query(50, ge=1, le=200, description="Items per page"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get violations with filtering and pagination
Returns violations from latest scan by default.
"""
offset = (page - 1) * page_size
violations, total = code_quality_service.get_violations(
db,
scan_id=scan_id,
severity=severity,
status=status,
rule_id=rule_id,
file_path=file_path,
limit=page_size,
offset=offset,
)
total_pages = (total + page_size - 1) // page_size
return ViolationListResponse(
violations=[
ViolationResponse(
id=v.id,
scan_id=v.scan_id,
rule_id=v.rule_id,
rule_name=v.rule_name,
severity=v.severity,
file_path=v.file_path,
line_number=v.line_number,
message=v.message,
context=v.context,
suggestion=v.suggestion,
status=v.status,
assigned_to=v.assigned_to,
resolved_at=v.resolved_at.isoformat() if v.resolved_at else None,
resolved_by=v.resolved_by,
resolution_note=v.resolution_note,
created_at=v.created_at.isoformat(),
)
for v in violations
],
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
)
@router.get("/violations/{violation_id}", response_model=ViolationDetailResponse)
async def get_violation(
violation_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Get single violation with details
Includes assignments and comments.
"""
violation = code_quality_service.get_violation_by_id(db, violation_id)
if not violation:
raise ViolationNotFoundException(violation_id)
# Format assignments
assignments = [
{
"id": a.id,
"user_id": a.user_id,
"assigned_at": a.assigned_at.isoformat(),
"assigned_by": a.assigned_by,
"due_date": a.due_date.isoformat() if a.due_date else None,
"priority": a.priority,
}
for a in violation.assignments
]
# Format comments
comments = [
{
"id": c.id,
"user_id": c.user_id,
"comment": c.comment,
"created_at": c.created_at.isoformat(),
}
for c in violation.comments
]
return ViolationDetailResponse(
id=violation.id,
scan_id=violation.scan_id,
rule_id=violation.rule_id,
rule_name=violation.rule_name,
severity=violation.severity,
file_path=violation.file_path,
line_number=violation.line_number,
message=violation.message,
context=violation.context,
suggestion=violation.suggestion,
status=violation.status,
assigned_to=violation.assigned_to,
resolved_at=(
violation.resolved_at.isoformat() if violation.resolved_at else None
),
resolved_by=violation.resolved_by,
resolution_note=violation.resolution_note,
created_at=violation.created_at.isoformat(),
assignments=assignments,
comments=comments,
)
@router.post("/violations/{violation_id}/assign")
async def assign_violation(
violation_id: int,
request: AssignViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Assign violation to a developer
Updates violation status to 'assigned'.
"""
assignment = code_quality_service.assign_violation(
db,
violation_id=violation_id,
user_id=request.user_id,
assigned_by=current_user.id,
due_date=request.due_date,
priority=request.priority,
)
db.commit()
return {
"id": assignment.id,
"violation_id": assignment.violation_id,
"user_id": assignment.user_id,
"assigned_at": assignment.assigned_at.isoformat(),
"assigned_by": assignment.assigned_by,
"due_date": (
assignment.due_date.isoformat() if assignment.due_date else None
),
"priority": assignment.priority,
}
@router.post("/violations/{violation_id}/resolve")
async def resolve_violation(
violation_id: int,
request: ResolveViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Mark violation as resolved
Records resolution timestamp and user.
ViolationNotFoundException bubbles up if violation doesn't exist.
"""
violation = code_quality_service.resolve_violation(
db,
violation_id=violation_id,
resolved_by=current_user.id,
resolution_note=request.resolution_note,
)
db.commit()
return {
"id": violation.id,
"status": violation.status,
"resolved_at": (
violation.resolved_at.isoformat() if violation.resolved_at else None
),
"resolved_by": violation.resolved_by,
"resolution_note": violation.resolution_note,
}
@router.post("/violations/{violation_id}/ignore")
async def ignore_violation(
violation_id: int,
request: IgnoreViolationRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Mark violation as ignored (won't fix)
Records reason for ignoring.
ViolationNotFoundException bubbles up if violation doesn't exist.
"""
violation = code_quality_service.ignore_violation(
db,
violation_id=violation_id,
ignored_by=current_user.id,
reason=request.reason,
)
db.commit()
return {
"id": violation.id,
"status": violation.status,
"resolved_at": (
violation.resolved_at.isoformat() if violation.resolved_at else None
),
"resolved_by": violation.resolved_by,
"resolution_note": violation.resolution_note,
}
@router.post("/violations/{violation_id}/comments")
async def add_comment(
violation_id: int,
request: AddCommentRequest,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
"""
Add comment to violation
For team collaboration and discussion.
"""
comment = code_quality_service.add_comment(
db,
violation_id=violation_id,
user_id=current_user.id,
comment=request.comment,
)
db.commit()
return {
"id": comment.id,
"violation_id": comment.violation_id,
"user_id": comment.user_id,
"comment": comment.comment,
"created_at": comment.created_at.isoformat(),
}
@router.get("/stats", response_model=CodeQualityDashboardStatsResponse)
async def get_dashboard_stats(
db: Session = Depends(get_db), current_user: User = Depends(get_current_admin_api)
):
"""
Get dashboard statistics
Returns comprehensive stats for the dashboard including:
- Total counts by severity and status
- Technical debt score
- Trend data (last 7 scans)
- Top violating files
- Violations by rule and module
"""
stats = code_quality_service.get_dashboard_stats(db)
return CodeQualityDashboardStatsResponse(**stats)