## Vendor-in-Token Architecture (Complete Migration) - Migrate all vendor API endpoints from require_vendor_context() to token_vendor_id - Update permission dependencies to extract vendor from JWT token - Add vendor exceptions: VendorAccessDeniedException, VendorOwnerOnlyException, InsufficientVendorPermissionsException - Shop endpoints retain require_vendor_context() for URL-based detection - Add AUTH-004 architecture rule enforcing vendor context patterns - Fix marketplace router missing /marketplace prefix ## Exception Pattern Fixes (API-003/API-004) - Services raise domain exceptions, endpoints let them bubble up - Add code_quality and content_page exception modules - Move business logic from endpoints to services (admin, auth, content_page) - Fix exception handling in admin, shop, and vendor endpoints ## Tailwind CSS Consolidation - Consolidate CSS to per-area files (admin, vendor, shop, platform) - Remove shared/cdn-fallback.html and shared/css/tailwind.min.css - Update all templates to use area-specific Tailwind output files - Remove Node.js config (package.json, postcss.config.js, tailwind.config.js) ## Documentation & Cleanup - Update vendor-in-token-architecture.md with completed migration status - Update architecture-rules.md with new rules - Move migration docs to docs/development/migration/ - Remove duplicate/obsolete documentation files - Merge pytest.ini settings into pyproject.toml 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
1211 lines
44 KiB
Python
Executable File
1211 lines
44 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
Architecture Validator
|
|
======================
|
|
Validates code against architectural rules defined in .architecture-rules.yaml
|
|
|
|
This script checks that the codebase follows key architectural decisions:
|
|
- Separation of concerns (routes vs services)
|
|
- Proper exception handling (domain exceptions vs HTTPException)
|
|
- Correct use of Pydantic vs SQLAlchemy models
|
|
- Service layer patterns
|
|
- API endpoint patterns
|
|
|
|
Usage:
|
|
python scripts/validate_architecture.py # Check all files in current directory
|
|
python scripts/validate_architecture.py -d app/api/ # Check specific directory
|
|
python scripts/validate_architecture.py -f app/api/v1/vendors.py # Check single file
|
|
python scripts/validate_architecture.py -o company # Check all company-related files
|
|
python scripts/validate_architecture.py -o vendor --verbose # Check vendor files with details
|
|
python scripts/validate_architecture.py --json # JSON output
|
|
|
|
Options:
|
|
-f, --file PATH Validate a single file (.py, .js, or .html)
|
|
-d, --folder PATH Validate all files in a directory (recursive)
|
|
-o, --object NAME Validate all files related to an entity (e.g., company, vendor, order)
|
|
-c, --config PATH Path to architecture rules config
|
|
-v, --verbose Show detailed output including context
|
|
--errors-only Only show errors, suppress warnings
|
|
--json Output results as JSON
|
|
"""
|
|
|
|
import argparse
|
|
import ast
|
|
import re
|
|
import sys
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import yaml
|
|
|
|
|
|
class Severity(Enum):
|
|
"""Validation severity levels"""
|
|
|
|
ERROR = "error"
|
|
WARNING = "warning"
|
|
INFO = "info"
|
|
|
|
|
|
@dataclass
|
|
class Violation:
|
|
"""Represents an architectural rule violation"""
|
|
|
|
rule_id: str
|
|
rule_name: str
|
|
severity: Severity
|
|
file_path: Path
|
|
line_number: int
|
|
message: str
|
|
context: str = ""
|
|
suggestion: str = ""
|
|
|
|
|
|
@dataclass
|
|
class FileResult:
|
|
"""Results for a single file validation"""
|
|
|
|
file_path: Path
|
|
errors: int = 0
|
|
warnings: int = 0
|
|
|
|
@property
|
|
def passed(self) -> bool:
|
|
return self.errors == 0
|
|
|
|
@property
|
|
def status(self) -> str:
|
|
if self.errors > 0:
|
|
return "FAILED"
|
|
elif self.warnings > 0:
|
|
return "PASSED*"
|
|
return "PASSED"
|
|
|
|
@property
|
|
def status_icon(self) -> str:
|
|
if self.errors > 0:
|
|
return "❌"
|
|
elif self.warnings > 0:
|
|
return "⚠️"
|
|
return "✅"
|
|
|
|
|
|
@dataclass
|
|
class ValidationResult:
|
|
"""Results of architecture validation"""
|
|
|
|
violations: list[Violation] = field(default_factory=list)
|
|
files_checked: int = 0
|
|
rules_applied: int = 0
|
|
file_results: list[FileResult] = field(default_factory=list)
|
|
|
|
def has_errors(self) -> bool:
|
|
"""Check if there are any error-level violations"""
|
|
return any(v.severity == Severity.ERROR for v in self.violations)
|
|
|
|
def has_warnings(self) -> bool:
|
|
"""Check if there are any warning-level violations"""
|
|
return any(v.severity == Severity.WARNING for v in self.violations)
|
|
|
|
|
|
class ArchitectureValidator:
|
|
"""Main validator class"""
|
|
|
|
def __init__(self, config_path: Path, verbose: bool = False):
|
|
"""Initialize validator with configuration"""
|
|
self.config_path = config_path
|
|
self.verbose = verbose
|
|
self.config = self._load_config()
|
|
self.result = ValidationResult()
|
|
self.project_root = Path.cwd()
|
|
|
|
def _load_config(self) -> dict[str, Any]:
|
|
"""Load validation rules from YAML config"""
|
|
if not self.config_path.exists():
|
|
print(f"❌ Configuration file not found: {self.config_path}")
|
|
sys.exit(1)
|
|
|
|
with open(self.config_path) as f:
|
|
config = yaml.safe_load(f)
|
|
|
|
print(f"📋 Loaded architecture rules: {config.get('project', 'unknown')}")
|
|
return config
|
|
|
|
def validate_all(self, target_path: Path = None) -> ValidationResult:
|
|
"""Validate all files in a directory"""
|
|
print("\n🔍 Starting architecture validation...\n")
|
|
|
|
target = target_path or self.project_root
|
|
|
|
# Validate API endpoints
|
|
self._validate_api_endpoints(target)
|
|
|
|
# Validate service layer
|
|
self._validate_service_layer(target)
|
|
|
|
# Validate models
|
|
self._validate_models(target)
|
|
|
|
# Validate exception handling
|
|
self._validate_exceptions(target)
|
|
|
|
# Validate JavaScript
|
|
self._validate_javascript(target)
|
|
|
|
# Validate templates
|
|
self._validate_templates(target)
|
|
|
|
return self.result
|
|
|
|
def validate_file(
|
|
self, file_path: Path, quiet: bool = False
|
|
) -> ValidationResult:
|
|
"""Validate a single file"""
|
|
if not file_path.exists():
|
|
if not quiet:
|
|
print(f"❌ File not found: {file_path}")
|
|
return self.result
|
|
|
|
if not file_path.is_file():
|
|
if not quiet:
|
|
print(f"❌ Not a file: {file_path}")
|
|
return self.result
|
|
|
|
if not quiet:
|
|
print(f"\n🔍 Validating single file: {file_path}\n")
|
|
|
|
# Resolve file path to absolute
|
|
file_path = file_path.resolve()
|
|
file_path_str = str(file_path)
|
|
|
|
if self._should_ignore_file(file_path):
|
|
if not quiet:
|
|
print("⏭️ File is in ignore list, skipping")
|
|
return self.result
|
|
|
|
self.result.files_checked += 1
|
|
|
|
# Track violations before this file
|
|
violations_before = len(self.result.violations)
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# Determine file type and run appropriate validators
|
|
if file_path.suffix == ".py":
|
|
self._validate_python_file(file_path, content, lines, file_path_str)
|
|
elif file_path.suffix == ".js":
|
|
self._validate_js_file(file_path, content, lines)
|
|
elif file_path.suffix == ".html":
|
|
self._validate_html_file(file_path, content, lines)
|
|
else:
|
|
if not quiet:
|
|
print(f"⚠️ Unsupported file type: {file_path.suffix}")
|
|
|
|
# Calculate violations for this file
|
|
file_violations = self.result.violations[violations_before:]
|
|
errors = sum(1 for v in file_violations if v.severity == Severity.ERROR)
|
|
warnings = sum(1 for v in file_violations if v.severity == Severity.WARNING)
|
|
|
|
# Track file result
|
|
self.result.file_results.append(
|
|
FileResult(file_path=file_path, errors=errors, warnings=warnings)
|
|
)
|
|
|
|
return self.result
|
|
|
|
def validate_object(self, object_name: str) -> ValidationResult:
|
|
"""Validate all files related to an entity (e.g., company, vendor, order)"""
|
|
print(f"\n🔍 Searching for '{object_name}'-related files...\n")
|
|
|
|
# Generate name variants (singular/plural forms)
|
|
name = object_name.lower()
|
|
variants = {name}
|
|
|
|
# Handle common plural patterns
|
|
if name.endswith("ies"):
|
|
# companies -> company
|
|
variants.add(name[:-3] + "y")
|
|
elif name.endswith("s"):
|
|
# vendors -> vendor
|
|
variants.add(name[:-1])
|
|
else:
|
|
# company -> companies, vendor -> vendors
|
|
if name.endswith("y"):
|
|
variants.add(name[:-1] + "ies")
|
|
variants.add(name + "s")
|
|
|
|
# Search patterns for different file types
|
|
patterns = []
|
|
for variant in variants:
|
|
patterns.extend([
|
|
f"app/api/**/*{variant}*.py",
|
|
f"app/services/*{variant}*.py",
|
|
f"app/exceptions/*{variant}*.py",
|
|
f"models/database/*{variant}*.py",
|
|
f"models/schema/*{variant}*.py",
|
|
f"static/admin/js/*{variant}*.js",
|
|
f"app/templates/admin/*{variant}*.html",
|
|
])
|
|
|
|
# Find all matching files
|
|
found_files: set[Path] = set()
|
|
for pattern in patterns:
|
|
matches = list(self.project_root.glob(pattern))
|
|
for match in matches:
|
|
if match.is_file() and not self._should_ignore_file(match):
|
|
found_files.add(match)
|
|
|
|
if not found_files:
|
|
print(f"❌ No files found matching '{object_name}'")
|
|
return self.result
|
|
|
|
# Sort files by type for better readability
|
|
sorted_files = sorted(found_files, key=lambda p: (p.suffix, str(p)))
|
|
|
|
print(f"📁 Found {len(sorted_files)} files:\n")
|
|
for f in sorted_files:
|
|
rel_path = f.relative_to(self.project_root)
|
|
print(f" • {rel_path}")
|
|
|
|
print("\n" + "-" * 60 + "\n")
|
|
|
|
# Validate each file
|
|
for file_path in sorted_files:
|
|
rel_path = file_path.relative_to(self.project_root)
|
|
print(f"📄 {rel_path}")
|
|
self.validate_file(file_path, quiet=True)
|
|
|
|
return self.result
|
|
|
|
def _validate_python_file(
|
|
self, file_path: Path, content: str, lines: list[str], file_path_str: str
|
|
):
|
|
"""Validate a single Python file based on its location"""
|
|
# API endpoints
|
|
if "/app/api/" in file_path_str or "\\app\\api\\" in file_path_str:
|
|
print("📡 Validating as API endpoint...")
|
|
self._check_pydantic_usage(file_path, content, lines)
|
|
self._check_no_business_logic_in_endpoints(file_path, content, lines)
|
|
self._check_endpoint_exception_handling(file_path, content, lines)
|
|
self._check_endpoint_authentication(file_path, content, lines)
|
|
|
|
# Service layer
|
|
elif "/app/services/" in file_path_str or "\\app\\services\\" in file_path_str:
|
|
print("🔧 Validating as service layer...")
|
|
self._check_no_http_exception_in_services(file_path, content, lines)
|
|
self._check_service_exceptions(file_path, content, lines)
|
|
self._check_db_session_parameter(file_path, content, lines)
|
|
|
|
# Models
|
|
elif "/app/models/" in file_path_str or "\\app\\models\\" in file_path_str:
|
|
print("📦 Validating as model...")
|
|
for i, line in enumerate(lines, 1):
|
|
if re.search(r"class.*\(Base.*,.*BaseModel.*\)", line):
|
|
self._add_violation(
|
|
rule_id="MDL-002",
|
|
rule_name="Separate SQLAlchemy and Pydantic models",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Model mixes SQLAlchemy Base and Pydantic BaseModel",
|
|
context=line.strip(),
|
|
suggestion="Keep SQLAlchemy models and Pydantic models separate",
|
|
)
|
|
|
|
# Generic Python file - check exception handling
|
|
print("⚠️ Validating exception handling...")
|
|
for i, line in enumerate(lines, 1):
|
|
if re.match(r"\s*except\s*:", line):
|
|
self._add_violation(
|
|
rule_id="EXC-002",
|
|
rule_name="No bare except clauses",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Bare except clause catches all exceptions including system exits",
|
|
context=line.strip(),
|
|
suggestion="Specify exception type: except ValueError: or except Exception:",
|
|
)
|
|
|
|
def _validate_js_file(self, file_path: Path, content: str, lines: list[str]):
|
|
"""Validate a single JavaScript file"""
|
|
print("🟨 Validating JavaScript...")
|
|
|
|
# JS-001: Check for window.apiClient
|
|
for i, line in enumerate(lines, 1):
|
|
if "window.apiClient" in line:
|
|
before_occurrence = line[: line.find("window.apiClient")]
|
|
if "//" not in before_occurrence:
|
|
self._add_violation(
|
|
rule_id="JS-001",
|
|
rule_name="Use apiClient directly",
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Use apiClient directly instead of window.apiClient",
|
|
context=line.strip(),
|
|
suggestion="Replace window.apiClient with apiClient",
|
|
)
|
|
|
|
# JS-002: Check for console usage
|
|
for i, line in enumerate(lines, 1):
|
|
if re.search(r"console\.(log|warn|error)", line):
|
|
if "//" in line or "✅" in line or "eslint-disable" in line:
|
|
continue
|
|
self._add_violation(
|
|
rule_id="JS-002",
|
|
rule_name="Use centralized logger",
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Use centralized logger instead of console",
|
|
context=line.strip()[:80],
|
|
suggestion="Use window.LogConfig.createLogger('moduleName')",
|
|
)
|
|
|
|
def _validate_html_file(self, file_path: Path, content: str, lines: list[str]):
|
|
"""Validate a single HTML template file"""
|
|
print("📄 Validating template...")
|
|
|
|
# Skip base template and partials
|
|
if "base.html" in file_path.name or "partials" in str(file_path):
|
|
print("⏭️ Skipping base/partial template")
|
|
return
|
|
|
|
# Only check admin templates
|
|
file_path_str = str(file_path)
|
|
if "/admin/" not in file_path_str and "\\admin\\" not in file_path_str:
|
|
print("⏭️ Not an admin template, skipping extends check")
|
|
return
|
|
|
|
# Check for standalone marker in template (first 5 lines)
|
|
# Supports: {# standalone #}, {# noqa: TPL-001 #}, <!-- standalone -->
|
|
first_lines = "\n".join(lines[:5]).lower()
|
|
if "standalone" in first_lines or "noqa: tpl-001" in first_lines:
|
|
print("⏭️ Template marked as standalone, skipping extends check")
|
|
return
|
|
|
|
# Check exclusion patterns for TPL-001
|
|
# These are templates that intentionally don't extend admin/base.html
|
|
tpl_001_exclusions = [
|
|
"login.html", # Standalone login page
|
|
"errors/", # Error pages extend errors/base.html
|
|
"test-", # Test templates
|
|
]
|
|
for exclusion in tpl_001_exclusions:
|
|
if exclusion in file_path_str:
|
|
print(f"⏭️ Template matches exclusion pattern '{exclusion}', skipping")
|
|
return
|
|
|
|
# TPL-001: Check for extends
|
|
has_extends = any(
|
|
"{% extends" in line and "admin/base.html" in line for line in lines
|
|
)
|
|
|
|
if not has_extends:
|
|
self._add_violation(
|
|
rule_id="TPL-001",
|
|
rule_name="Templates must extend base",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=1,
|
|
message="Admin template does not extend admin/base.html",
|
|
context=file_path.name,
|
|
suggestion="Add {% extends 'admin/base.html' %} at the top, or add {# standalone #} if intentional",
|
|
)
|
|
|
|
def _validate_api_endpoints(self, target_path: Path):
|
|
"""Validate API endpoint rules (API-001, API-002, API-003, API-004)"""
|
|
print("📡 Validating API endpoints...")
|
|
|
|
api_files = list(target_path.glob("app/api/v1/**/*.py"))
|
|
self.result.files_checked += len(api_files)
|
|
|
|
for file_path in api_files:
|
|
if self._should_ignore_file(file_path):
|
|
continue
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# API-001: Check for Pydantic model usage
|
|
self._check_pydantic_usage(file_path, content, lines)
|
|
|
|
# API-002: Check for business logic in endpoints
|
|
self._check_no_business_logic_in_endpoints(file_path, content, lines)
|
|
|
|
# API-003: Check exception handling
|
|
self._check_endpoint_exception_handling(file_path, content, lines)
|
|
|
|
# API-004: Check authentication
|
|
self._check_endpoint_authentication(file_path, content, lines)
|
|
|
|
def _check_pydantic_usage(self, file_path: Path, content: str, lines: list[str]):
|
|
"""API-001: Ensure endpoints use Pydantic models"""
|
|
rule = self._get_rule("API-001")
|
|
if not rule:
|
|
return
|
|
|
|
# Check for response_model in route decorators
|
|
route_pattern = r"@router\.(get|post|put|delete|patch)"
|
|
dict_return_pattern = r"return\s+\{.*\}"
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
# Check for dict returns in endpoints
|
|
if re.search(route_pattern, line):
|
|
# Look ahead for function body
|
|
func_start = i
|
|
indent = len(line) - len(line.lstrip())
|
|
|
|
# Find function body
|
|
for j in range(func_start, min(func_start + 20, len(lines))):
|
|
if j >= len(lines):
|
|
break
|
|
|
|
func_line = lines[j]
|
|
if re.search(dict_return_pattern, func_line):
|
|
self._add_violation(
|
|
rule_id="API-001",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=j + 1,
|
|
message="Endpoint returns raw dict instead of Pydantic model",
|
|
context=func_line.strip(),
|
|
suggestion="Define a Pydantic response model and use response_model parameter",
|
|
)
|
|
|
|
def _check_no_business_logic_in_endpoints(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""API-002: Ensure no business logic in endpoints"""
|
|
rule = self._get_rule("API-002")
|
|
if not rule:
|
|
return
|
|
|
|
# NOTE: db.commit() is intentionally NOT included here
|
|
# Transaction control (commit) is allowed at endpoint level
|
|
# Only business logic operations are flagged
|
|
anti_patterns = [
|
|
(r"db\.add\(", "Creating entities should be in service layer"),
|
|
(r"db\.delete\(", "Deleting entities should be in service layer"),
|
|
(r"db\.query\(", "Database queries should be in service layer"),
|
|
(r"db\.execute\(", "Database operations should be in service layer"),
|
|
]
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
# Skip service method calls (allowed)
|
|
if "_service." in line or "service." in line:
|
|
continue
|
|
|
|
for pattern, message in anti_patterns:
|
|
if re.search(pattern, line):
|
|
self._add_violation(
|
|
rule_id="API-002",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message=message,
|
|
context=line.strip(),
|
|
suggestion="Move database operations to service layer",
|
|
)
|
|
|
|
def _check_endpoint_exception_handling(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""API-003: Check that endpoints do NOT raise HTTPException directly.
|
|
|
|
The architecture uses a global exception handler that catches domain
|
|
exceptions (WizamartException subclasses) and converts them to HTTP
|
|
responses. Endpoints should let exceptions bubble up, not catch and
|
|
convert them manually.
|
|
"""
|
|
rule = self._get_rule("API-003")
|
|
if not rule:
|
|
return
|
|
|
|
# Skip exception handler file - it's allowed to use HTTPException
|
|
if "exceptions/handler.py" in str(file_path):
|
|
return
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
# Check for raise HTTPException
|
|
if "raise HTTPException" in line:
|
|
# Skip if it's a comment
|
|
stripped = line.strip()
|
|
if stripped.startswith("#"):
|
|
continue
|
|
|
|
self._add_violation(
|
|
rule_id="API-003",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Endpoint raises HTTPException directly",
|
|
context=line.strip()[:80],
|
|
suggestion="Use domain exceptions (e.g., VendorNotFoundException) and let global handler convert",
|
|
)
|
|
|
|
def _check_endpoint_authentication(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""API-004: Check authentication on endpoints
|
|
|
|
Automatically skips:
|
|
- Auth endpoint files (*/auth.py) - login/logout are intentionally public
|
|
- Endpoints marked with '# public' comment
|
|
"""
|
|
rule = self._get_rule("API-004")
|
|
if not rule:
|
|
return
|
|
|
|
# Skip auth endpoint files entirely - they are intentionally public
|
|
file_path_str = str(file_path)
|
|
if file_path_str.endswith("/auth.py") or file_path_str.endswith("\\auth.py"):
|
|
return
|
|
|
|
# This is a warning-level check
|
|
# Look for endpoints without Depends(get_current_*)
|
|
for i, line in enumerate(lines, 1):
|
|
if "@router." in line and (
|
|
"post" in line or "put" in line or "delete" in line
|
|
):
|
|
# Check next 15 lines for auth or public marker
|
|
# (increased from 5 to handle multi-line decorators and long function signatures)
|
|
has_auth = False
|
|
is_public = False
|
|
context_lines = lines[i - 1 : i + 15] # Include line before decorator
|
|
|
|
for ctx_line in context_lines:
|
|
if "Depends(get_current_" in ctx_line:
|
|
has_auth = True
|
|
break
|
|
# Check for public endpoint markers
|
|
if "# public" in ctx_line.lower() or "# noqa: api-004" in ctx_line.lower():
|
|
is_public = True
|
|
break
|
|
|
|
if not has_auth and not is_public and "include_in_schema=False" not in " ".join(
|
|
lines[i : i + 15]
|
|
):
|
|
self._add_violation(
|
|
rule_id="API-004",
|
|
rule_name=rule["name"],
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Endpoint may be missing authentication",
|
|
context=line.strip(),
|
|
suggestion="Add Depends(get_current_user) or mark as '# public' if intentionally unauthenticated",
|
|
)
|
|
|
|
def _validate_service_layer(self, target_path: Path):
|
|
"""Validate service layer rules (SVC-001, SVC-002, SVC-003, SVC-004, SVC-006)"""
|
|
print("🔧 Validating service layer...")
|
|
|
|
service_files = list(target_path.glob("app/services/**/*.py"))
|
|
self.result.files_checked += len(service_files)
|
|
|
|
for file_path in service_files:
|
|
if self._should_ignore_file(file_path):
|
|
continue
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# SVC-001: No HTTPException in services
|
|
self._check_no_http_exception_in_services(file_path, content, lines)
|
|
|
|
# SVC-002: Proper exception handling
|
|
self._check_service_exceptions(file_path, content, lines)
|
|
|
|
# SVC-003: DB session as parameter
|
|
self._check_db_session_parameter(file_path, content, lines)
|
|
|
|
# SVC-006: No db.commit() in services
|
|
self._check_no_commit_in_services(file_path, content, lines)
|
|
|
|
def _check_no_http_exception_in_services(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""SVC-001: Services must not raise HTTPException"""
|
|
rule = self._get_rule("SVC-001")
|
|
if not rule:
|
|
return
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
if "raise HTTPException" in line:
|
|
self._add_violation(
|
|
rule_id="SVC-001",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Service raises HTTPException - use domain exceptions instead",
|
|
context=line.strip(),
|
|
suggestion="Create custom exception class (e.g., VendorNotFoundError) and raise that",
|
|
)
|
|
|
|
if (
|
|
"from fastapi import HTTPException" in line
|
|
or "from fastapi.exceptions import HTTPException" in line
|
|
):
|
|
self._add_violation(
|
|
rule_id="SVC-001",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Service imports HTTPException - services should not know about HTTP",
|
|
context=line.strip(),
|
|
suggestion="Remove HTTPException import and use domain exceptions",
|
|
)
|
|
|
|
def _check_service_exceptions(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""SVC-002: Check for proper exception handling"""
|
|
rule = self._get_rule("SVC-002")
|
|
if not rule:
|
|
return
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
# Check for generic Exception raises
|
|
if re.match(r"\s*raise Exception\(", line):
|
|
self._add_violation(
|
|
rule_id="SVC-002",
|
|
rule_name=rule["name"],
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Service raises generic Exception - use specific domain exception",
|
|
context=line.strip(),
|
|
suggestion="Create custom exception class for this error case",
|
|
)
|
|
|
|
def _check_db_session_parameter(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""SVC-003: Service methods should accept db session as parameter"""
|
|
rule = self._get_rule("SVC-003")
|
|
if not rule:
|
|
return
|
|
|
|
# Check for SessionLocal() creation in service files
|
|
for i, line in enumerate(lines, 1):
|
|
if "SessionLocal()" in line and "class" not in line:
|
|
self._add_violation(
|
|
rule_id="SVC-003",
|
|
rule_name=rule["name"],
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Service creates database session internally",
|
|
context=line.strip(),
|
|
suggestion="Accept db: Session as method parameter instead",
|
|
)
|
|
|
|
def _check_no_commit_in_services(
|
|
self, file_path: Path, content: str, lines: list[str]
|
|
):
|
|
"""SVC-006: Services should NOT call db.commit()
|
|
|
|
Transaction control belongs at the API endpoint level.
|
|
Exception: log_service.py may need immediate commits for audit logs.
|
|
"""
|
|
rule = self._get_rule("SVC-006")
|
|
if not rule:
|
|
return
|
|
|
|
# Exception: log_service.py is allowed to commit (audit logs)
|
|
if "log_service.py" in str(file_path):
|
|
return
|
|
|
|
for i, line in enumerate(lines, 1):
|
|
if "db.commit()" in line:
|
|
# Skip if it's a comment
|
|
stripped = line.strip()
|
|
if stripped.startswith("#"):
|
|
continue
|
|
|
|
self._add_violation(
|
|
rule_id="SVC-006",
|
|
rule_name=rule["name"],
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Service calls db.commit() - transaction control should be at endpoint level",
|
|
context=stripped,
|
|
suggestion="Remove db.commit() from service; let endpoint handle transaction",
|
|
)
|
|
|
|
def _validate_models(self, target_path: Path):
|
|
"""Validate model rules"""
|
|
print("📦 Validating models...")
|
|
|
|
model_files = list(target_path.glob("app/models/**/*.py"))
|
|
self.result.files_checked += len(model_files)
|
|
|
|
# Basic validation - can be extended
|
|
for file_path in model_files:
|
|
if self._should_ignore_file(file_path):
|
|
continue
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# Check for mixing SQLAlchemy and Pydantic
|
|
for i, line in enumerate(lines, 1):
|
|
if re.search(r"class.*\(Base.*,.*BaseModel.*\)", line):
|
|
self._add_violation(
|
|
rule_id="MDL-002",
|
|
rule_name="Separate SQLAlchemy and Pydantic models",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Model mixes SQLAlchemy Base and Pydantic BaseModel",
|
|
context=line.strip(),
|
|
suggestion="Keep SQLAlchemy models and Pydantic models separate",
|
|
)
|
|
|
|
def _validate_exceptions(self, target_path: Path):
|
|
"""Validate exception handling patterns"""
|
|
print("⚠️ Validating exception handling...")
|
|
|
|
py_files = list(target_path.glob("**/*.py"))
|
|
|
|
for file_path in py_files:
|
|
if self._should_ignore_file(file_path):
|
|
continue
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# EXC-002: Check for bare except
|
|
for i, line in enumerate(lines, 1):
|
|
if re.match(r"\s*except\s*:", line):
|
|
self._add_violation(
|
|
rule_id="EXC-002",
|
|
rule_name="No bare except clauses",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Bare except clause catches all exceptions including system exits",
|
|
context=line.strip(),
|
|
suggestion="Specify exception type: except ValueError: or except Exception:",
|
|
)
|
|
|
|
def _validate_javascript(self, target_path: Path):
|
|
"""Validate JavaScript patterns"""
|
|
print("🟨 Validating JavaScript...")
|
|
|
|
js_files = list(target_path.glob("static/admin/js/**/*.js"))
|
|
self.result.files_checked += len(js_files)
|
|
|
|
for file_path in js_files:
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# JS-001: Check for window.apiClient
|
|
for i, line in enumerate(lines, 1):
|
|
if "window.apiClient" in line:
|
|
# Check if it's not in a comment
|
|
before_occurrence = line[: line.find("window.apiClient")]
|
|
if "//" not in before_occurrence:
|
|
self._add_violation(
|
|
rule_id="JS-001",
|
|
rule_name="Use apiClient directly",
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Use apiClient directly instead of window.apiClient",
|
|
context=line.strip(),
|
|
suggestion="Replace window.apiClient with apiClient",
|
|
)
|
|
|
|
# JS-002: Check for console usage
|
|
for i, line in enumerate(lines, 1):
|
|
if re.search(r"console\.(log|warn|error)", line):
|
|
# Skip if it's a comment or bootstrap message
|
|
if "//" in line or "✅" in line or "eslint-disable" in line:
|
|
continue
|
|
|
|
self._add_violation(
|
|
rule_id="JS-002",
|
|
rule_name="Use centralized logger",
|
|
severity=Severity.WARNING,
|
|
file_path=file_path,
|
|
line_number=i,
|
|
message="Use centralized logger instead of console",
|
|
context=line.strip()[:80],
|
|
suggestion="Use window.LogConfig.createLogger('moduleName')",
|
|
)
|
|
|
|
def _validate_templates(self, target_path: Path):
|
|
"""Validate template patterns"""
|
|
print("📄 Validating templates...")
|
|
|
|
template_files = list(target_path.glob("app/templates/admin/**/*.html"))
|
|
self.result.files_checked += len(template_files)
|
|
|
|
# TPL-001 exclusion patterns
|
|
tpl_001_exclusions = [
|
|
"login.html", # Standalone login page
|
|
"errors/", # Error pages extend errors/base.html
|
|
"test-", # Test templates
|
|
]
|
|
|
|
for file_path in template_files:
|
|
# Skip base template and partials
|
|
if "base.html" in file_path.name or "partials" in str(file_path):
|
|
continue
|
|
|
|
file_path_str = str(file_path)
|
|
|
|
# Check exclusion patterns
|
|
skip = False
|
|
for exclusion in tpl_001_exclusions:
|
|
if exclusion in file_path_str:
|
|
skip = True
|
|
break
|
|
if skip:
|
|
continue
|
|
|
|
content = file_path.read_text()
|
|
lines = content.split("\n")
|
|
|
|
# Check for standalone marker in template (first 5 lines)
|
|
first_lines = "\n".join(lines[:5]).lower()
|
|
if "standalone" in first_lines or "noqa: tpl-001" in first_lines:
|
|
continue
|
|
|
|
# TPL-001: Check for extends
|
|
has_extends = any(
|
|
"{% extends" in line and "admin/base.html" in line for line in lines
|
|
)
|
|
|
|
if not has_extends:
|
|
self._add_violation(
|
|
rule_id="TPL-001",
|
|
rule_name="Templates must extend base",
|
|
severity=Severity.ERROR,
|
|
file_path=file_path,
|
|
line_number=1,
|
|
message="Admin template does not extend admin/base.html",
|
|
context=file_path.name,
|
|
suggestion="Add {% extends 'admin/base.html' %} at the top, or add {# standalone #} if intentional",
|
|
)
|
|
|
|
def _get_rule(self, rule_id: str) -> dict[str, Any]:
|
|
"""Get rule configuration by ID"""
|
|
# Look in different rule categories
|
|
for category in [
|
|
"api_endpoint_rules",
|
|
"service_layer_rules",
|
|
"model_rules",
|
|
"exception_rules",
|
|
"javascript_rules",
|
|
"template_rules",
|
|
]:
|
|
rules = self.config.get(category, [])
|
|
for rule in rules:
|
|
if rule.get("id") == rule_id:
|
|
return rule
|
|
return None
|
|
|
|
def _should_ignore_file(self, file_path: Path) -> bool:
|
|
"""Check if file should be ignored"""
|
|
ignore_patterns = self.config.get("ignore", {}).get("files", [])
|
|
|
|
# Convert to string for easier matching
|
|
file_path_str = str(file_path)
|
|
|
|
for pattern in ignore_patterns:
|
|
# Check if any part of the path matches the pattern
|
|
if file_path.match(pattern):
|
|
return True
|
|
# Also check if pattern appears in the path (for .venv, venv, etc.)
|
|
if "/.venv/" in file_path_str or file_path_str.startswith(".venv/"):
|
|
return True
|
|
if "/venv/" in file_path_str or file_path_str.startswith("venv/"):
|
|
return True
|
|
|
|
return False
|
|
|
|
def _add_violation(
|
|
self,
|
|
rule_id: str,
|
|
rule_name: str,
|
|
severity: Severity,
|
|
file_path: Path,
|
|
line_number: int,
|
|
message: str,
|
|
context: str = "",
|
|
suggestion: str = "",
|
|
):
|
|
"""Add a violation to results"""
|
|
violation = Violation(
|
|
rule_id=rule_id,
|
|
rule_name=rule_name,
|
|
severity=severity,
|
|
file_path=file_path,
|
|
line_number=line_number,
|
|
message=message,
|
|
context=context,
|
|
suggestion=suggestion,
|
|
)
|
|
self.result.violations.append(violation)
|
|
|
|
def print_report(self):
|
|
"""Print validation report"""
|
|
print("\n" + "=" * 80)
|
|
print("📊 ARCHITECTURE VALIDATION REPORT")
|
|
print("=" * 80 + "\n")
|
|
|
|
print(f"Files checked: {self.result.files_checked}")
|
|
print(f"Total violations: {len(self.result.violations)}\n")
|
|
|
|
# Print file summary table if we have file results
|
|
if self.result.file_results:
|
|
self._print_summary_table()
|
|
|
|
# Group by severity
|
|
errors = [v for v in self.result.violations if v.severity == Severity.ERROR]
|
|
warnings = [v for v in self.result.violations if v.severity == Severity.WARNING]
|
|
|
|
if errors:
|
|
print(f"\n❌ ERRORS ({len(errors)}):")
|
|
print("-" * 80)
|
|
for violation in errors:
|
|
self._print_violation(violation)
|
|
|
|
if warnings:
|
|
print(f"\n⚠️ WARNINGS ({len(warnings)}):")
|
|
print("-" * 80)
|
|
for violation in warnings:
|
|
self._print_violation(violation)
|
|
|
|
# Summary
|
|
print("\n" + "=" * 80)
|
|
if self.result.has_errors():
|
|
print("❌ VALIDATION FAILED - Fix errors before committing")
|
|
print("=" * 80)
|
|
return 1
|
|
if self.result.has_warnings():
|
|
print("⚠️ VALIDATION PASSED WITH WARNINGS")
|
|
print("=" * 80)
|
|
return 0
|
|
print("✅ VALIDATION PASSED - No violations found")
|
|
print("=" * 80)
|
|
return 0
|
|
|
|
def _print_summary_table(self):
|
|
"""Print a summary table of file results"""
|
|
print("📋 FILE SUMMARY:")
|
|
print("-" * 80)
|
|
|
|
# Calculate column widths
|
|
max_path_len = max(
|
|
len(
|
|
str(
|
|
fr.file_path.relative_to(self.project_root)
|
|
if self.project_root in fr.file_path.parents
|
|
else fr.file_path
|
|
)
|
|
)
|
|
for fr in self.result.file_results
|
|
)
|
|
max_path_len = min(max_path_len, 55) # Cap at 55 chars
|
|
|
|
# Print header
|
|
print(f" {'File':<{max_path_len}} {'Status':<8} {'Errors':<7} {'Warnings':<8}")
|
|
print(f" {'-' * max_path_len} {'-' * 8} {'-' * 7} {'-' * 8}")
|
|
|
|
# Print each file
|
|
for fr in self.result.file_results:
|
|
rel_path = (
|
|
fr.file_path.relative_to(self.project_root)
|
|
if self.project_root in fr.file_path.parents
|
|
else fr.file_path
|
|
)
|
|
path_str = str(rel_path)
|
|
if len(path_str) > max_path_len:
|
|
path_str = "..." + path_str[-(max_path_len - 3) :]
|
|
|
|
print(
|
|
f" {path_str:<{max_path_len}} "
|
|
f"{fr.status_icon} {fr.status:<5} "
|
|
f"{fr.errors:<7} "
|
|
f"{fr.warnings:<8}"
|
|
)
|
|
|
|
print("-" * 80)
|
|
|
|
# Print totals
|
|
total_errors = sum(fr.errors for fr in self.result.file_results)
|
|
total_warnings = sum(fr.warnings for fr in self.result.file_results)
|
|
passed = sum(1 for fr in self.result.file_results if fr.passed)
|
|
failed = len(self.result.file_results) - passed
|
|
|
|
print(f"\n Total: {len(self.result.file_results)} files | "
|
|
f"✅ {passed} passed | ❌ {failed} failed | "
|
|
f"{total_errors} errors | {total_warnings} warnings\n")
|
|
|
|
def print_json(self) -> int:
|
|
"""Print validation results as JSON"""
|
|
import json
|
|
|
|
violations_json = []
|
|
for v in self.result.violations:
|
|
rel_path = (
|
|
str(v.file_path.relative_to(self.project_root))
|
|
if self.project_root in v.file_path.parents
|
|
else str(v.file_path)
|
|
)
|
|
violations_json.append(
|
|
{
|
|
"rule_id": v.rule_id,
|
|
"rule_name": v.rule_name,
|
|
"severity": v.severity.value,
|
|
"file_path": rel_path,
|
|
"line_number": v.line_number,
|
|
"message": v.message,
|
|
"context": v.context or "",
|
|
"suggestion": v.suggestion or "",
|
|
}
|
|
)
|
|
|
|
output = {
|
|
"files_checked": self.result.files_checked,
|
|
"total_violations": len(self.result.violations),
|
|
"errors": len(
|
|
[v for v in self.result.violations if v.severity == Severity.ERROR]
|
|
),
|
|
"warnings": len(
|
|
[v for v in self.result.violations if v.severity == Severity.WARNING]
|
|
),
|
|
"violations": violations_json,
|
|
}
|
|
|
|
print(json.dumps(output, indent=2))
|
|
|
|
return 1 if self.result.has_errors() else 0
|
|
|
|
def _print_violation(self, v: Violation):
|
|
"""Print a single violation"""
|
|
rel_path = (
|
|
v.file_path.relative_to(self.project_root)
|
|
if self.project_root in v.file_path.parents
|
|
else v.file_path
|
|
)
|
|
|
|
print(f"\n [{v.rule_id}] {v.rule_name}")
|
|
print(f" File: {rel_path}:{v.line_number}")
|
|
print(f" Issue: {v.message}")
|
|
|
|
if v.context and self.verbose:
|
|
print(f" Context: {v.context}")
|
|
|
|
if v.suggestion:
|
|
print(f" 💡 Suggestion: {v.suggestion}")
|
|
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(
|
|
description="Validate architecture patterns in codebase",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog=__doc__,
|
|
)
|
|
|
|
# Target options (mutually exclusive)
|
|
target_group = parser.add_mutually_exclusive_group()
|
|
|
|
target_group.add_argument(
|
|
"-f",
|
|
"--file",
|
|
type=Path,
|
|
metavar="PATH",
|
|
help="Validate a single file (.py, .js, or .html)",
|
|
)
|
|
|
|
target_group.add_argument(
|
|
"-d",
|
|
"--folder",
|
|
type=Path,
|
|
metavar="PATH",
|
|
help="Validate all files in a directory (recursive)",
|
|
)
|
|
|
|
target_group.add_argument(
|
|
"-o",
|
|
"--object",
|
|
type=str,
|
|
metavar="NAME",
|
|
help="Validate all files related to an entity (e.g., company, vendor, order)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-c",
|
|
"--config",
|
|
type=Path,
|
|
default=Path.cwd() / ".architecture-rules.yaml",
|
|
help="Path to architecture rules config (default: .architecture-rules.yaml)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"-v",
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Show detailed output including context",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--errors-only", action="store_true", help="Only show errors, suppress warnings"
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--json",
|
|
action="store_true",
|
|
help="Output results as JSON (for programmatic use)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create validator
|
|
validator = ArchitectureValidator(args.config, verbose=args.verbose)
|
|
|
|
# Determine validation mode
|
|
if args.file:
|
|
# Validate single file
|
|
result = validator.validate_file(args.file)
|
|
elif args.folder:
|
|
# Validate directory
|
|
if not args.folder.is_dir():
|
|
print(f"❌ Not a directory: {args.folder}")
|
|
sys.exit(1)
|
|
result = validator.validate_all(args.folder)
|
|
elif args.object:
|
|
# Validate all files related to an entity
|
|
result = validator.validate_object(args.object)
|
|
else:
|
|
# Default: validate current directory
|
|
result = validator.validate_all(Path.cwd())
|
|
|
|
# Output results
|
|
if args.json:
|
|
exit_code = validator.print_json()
|
|
else:
|
|
exit_code = validator.print_report()
|
|
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|