Files
orion/scripts/validate/validate_performance.py
Samir Boulahtit 688896d856
Some checks failed
CI / ruff (push) Successful in 9s
CI / architecture (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / audit (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
fix: add .dockerignore and env_file to docker-compose
Prevents .env from being baked into Docker image (was overriding
config defaults). Adds env_file directive so containers load host
.env properly.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-14 20:01:21 +01:00

680 lines
30 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Performance Validator
=====================
Validates code against performance rules defined in .performance-rules/
This script checks for common performance issues:
- N+1 query patterns
- Missing pagination
- Inefficient database operations
- Memory management issues
- Frontend performance anti-patterns
- Missing timeouts and connection pooling
Usage:
python scripts/validate/validate_performance.py # Check all files
python scripts/validate/validate_performance.py -d app/services/ # Check specific directory
python scripts/validate/validate_performance.py -f app/api/v1/products.py # Check single file
python scripts/validate/validate_performance.py -v # Verbose output
python scripts/validate/validate_performance.py --json # JSON output
python scripts/validate/validate_performance.py --errors-only # Only show errors
Options:
-f, --file PATH Validate a single file
-d, --folder PATH Validate all files in a directory (recursive)
-v, --verbose Show detailed output including context
--errors-only Only show errors, suppress warnings and info
--json Output results as JSON
"""
import argparse
import re
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_validator import BaseValidator, Severity, ValidationResult
class PerformanceValidator(BaseValidator):
"""Performance-focused code validator"""
VALIDATOR_NAME = "Performance Validator"
VALIDATOR_EMOJI = ""
RULES_DIR_NAME = ".performance-rules"
CONFIG_FILE_NAME = ".performance-rules.yaml"
def validate_all(self, target_path: Path = None) -> ValidationResult:
"""Validate all files for performance issues"""
print(f"\n{self.VALIDATOR_EMOJI} Starting performance validation...\n")
target = target_path or self.project_root
# Validate Python files
self._validate_python_files(target)
# Validate JavaScript files
self._validate_javascript_files(target)
# Validate HTML templates
self._validate_template_files(target)
return self.result
def _validate_python_files(self, target: Path):
"""Validate all Python files for performance issues"""
print("🐍 Validating Python files...")
for py_file in target.rglob("*.py"):
if self._should_ignore_file(py_file):
continue
self.result.files_checked += 1
content = py_file.read_text()
lines = content.split("\n")
self._validate_python_performance(py_file, content, lines)
def _validate_javascript_files(self, target: Path):
"""Validate all JavaScript files for performance issues"""
print("🟨 Validating JavaScript files...")
for js_file in target.rglob("*.js"):
if self._should_ignore_file(js_file):
continue
self.result.files_checked += 1
content = js_file.read_text()
lines = content.split("\n")
self._validate_javascript_performance(js_file, content, lines)
def _validate_template_files(self, target: Path):
"""Validate all HTML template files for performance issues"""
print("📄 Validating template files...")
for html_file in target.rglob("*.html"):
if self._should_ignore_file(html_file):
continue
self.result.files_checked += 1
content = html_file.read_text()
lines = content.split("\n")
self._validate_template_performance(html_file, content, lines)
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
"""Validate file content based on file type"""
if file_path.suffix == ".py":
self._validate_python_performance(file_path, content, lines)
elif file_path.suffix == ".js":
self._validate_javascript_performance(file_path, content, lines)
elif file_path.suffix == ".html":
self._validate_template_performance(file_path, content, lines)
def _validate_python_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate Python file for performance issues"""
file_path_str = str(file_path)
# PERF-001: N+1 query detection
self._check_n_plus_1_queries(file_path, content, lines)
# PERF-003: Query result limiting
self._check_query_limiting(file_path, content, lines)
# PERF-006: Bulk operations
self._check_bulk_operations(file_path, content, lines)
# PERF-008: Use EXISTS for existence checks
self._check_existence_checks(file_path, content, lines)
# PERF-009: Batch updates
self._check_batch_updates(file_path, content, lines)
# PERF-026: Pagination for API endpoints
if "/api/" in file_path_str:
self._check_api_pagination(file_path, content, lines)
# PERF-037: Parallel async operations
self._check_parallel_async(file_path, content, lines)
# PERF-040: Timeout configuration
self._check_timeout_config(file_path, content, lines)
# PERF-046: Generators for large datasets
self._check_generators(file_path, content, lines)
# PERF-047: Stream file uploads
if "upload" in file_path_str.lower() or "file" in file_path_str.lower():
self._check_file_streaming(file_path, content, lines)
# PERF-048: Chunked processing
if "import" in file_path_str.lower() or "csv" in file_path_str.lower():
self._check_chunked_processing(file_path, content, lines)
# PERF-049: Context managers for files
self._check_context_managers(file_path, content, lines)
# PERF-051: String concatenation
self._check_string_concatenation(file_path, content, lines)
def _validate_javascript_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate JavaScript file for performance issues"""
# PERF-056: Debounce search inputs
self._check_debounce(file_path, content, lines)
# PERF-062: Polling intervals
self._check_polling_intervals(file_path, content, lines)
# PERF-064: Layout thrashing
self._check_layout_thrashing(file_path, content, lines)
def _validate_template_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate HTML template file for performance issues"""
# PERF-058: Image lazy loading
self._check_image_lazy_loading(file_path, content, lines)
# PERF-067: Script defer/async
self._check_script_loading(file_path, content, lines)
# =========================================================================
# Database Performance Checks
# =========================================================================
def _check_n_plus_1_queries(self, file_path: Path, content: str, lines: list[str]):
"""PERF-001: Check for N+1 query patterns"""
# Look for patterns like: for item in items: item.relationship.attribute
in_for_loop = False
for_line_num = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops over query results
if re.search(r"for\s+\w+\s+in\s+.*\.(all|query)", line):
in_for_loop = True
for_line_num = i
elif in_for_loop and stripped and not stripped.startswith("#"):
# Check for relationship access in loop
if re.search(r"\.\w+\.\w+", line) and "(" not in line:
# Could be accessing a relationship
if any(rel in line for rel in [".customer.", ".store.", ".order.", ".product.", ".user."]):
if self._is_noqa_suppressed(line, "PERF-001"):
continue
self._add_violation(
rule_id="PERF-001",
rule_name="N+1 query detection",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Possible N+1 query - relationship accessed in loop",
context=line.strip()[:80],
suggestion="Use joinedload() or selectinload() for eager loading",
)
in_for_loop = False
# Reset on dedent
if in_for_loop and line and not line.startswith(" " * 4) and i > for_line_num + 1:
in_for_loop = False
def _check_query_limiting(self, file_path: Path, content: str, lines: list[str]):
"""PERF-003: Check for unbounded query results"""
for i, line in enumerate(lines, 1):
if re.search(r"\.all\(\)", line):
# Check if there's a limit or filter before
context_start = max(0, i - 5)
context_lines = lines[context_start:i]
context_text = "\n".join(context_lines)
if "limit" not in context_text.lower() and "filter" not in context_text.lower():
if self._is_noqa_suppressed(line, "PERF-003") or "# bounded" in line:
continue
self._add_violation(
rule_id="PERF-003",
rule_name="Query result limiting",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Query may return unbounded results",
context=line.strip()[:80],
suggestion="Add .limit() or pagination for large tables",
)
def _check_bulk_operations(self, file_path: Path, content: str, lines: list[str]):
"""PERF-006: Check for individual operations in loops"""
in_for_loop = False
for_indent = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops
if re.search(r"for\s+\w+\s+in\s+", line):
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif "db.add(" in line or ".save(" in line:
if self._is_noqa_suppressed(line, "PERF-006"):
continue
self._add_violation(
rule_id="PERF-006",
rule_name="Bulk operations for multiple records",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Individual db.add() in loop - consider bulk operations",
context=line.strip()[:80],
suggestion="Use db.add_all() or bulk_insert_mappings()",
)
def _check_existence_checks(self, file_path: Path, content: str, lines: list[str]):
"""PERF-008: Check for inefficient existence checks"""
patterns = [
(r"\.count\(\)\s*>\s*0", "count() > 0"),
(r"\.count\(\)\s*>=\s*1", "count() >= 1"),
(r"\.count\(\)\s*!=\s*0", "count() != 0"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line):
if self._is_noqa_suppressed(line, "PERF-008"):
continue
self._add_violation(
rule_id="PERF-008",
rule_name="Use EXISTS for existence checks",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message=f"{issue} scans all rows - use EXISTS instead",
context=line.strip()[:80],
suggestion="Use db.scalar(exists().where(...)) or .first() is not None",
)
def _check_batch_updates(self, file_path: Path, content: str, lines: list[str]):
"""PERF-009: Check for updates in loops"""
in_for_loop = False
for_indent = 0
loop_var = ""
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops
match = re.search(r"for\s+(\w+)\s+in\s+", line)
if match:
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
loop_var = match.group(1)
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif loop_var and f"{loop_var}." in line and "=" in line and "==" not in line:
# Attribute assignment in loop
if self._is_noqa_suppressed(line, "PERF-009"):
continue
self._add_violation(
rule_id="PERF-009",
rule_name="Batch updates instead of loops",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Individual updates in loop - consider batch update",
context=line.strip()[:80],
suggestion="Use .update({...}) with filters for batch updates",
)
# =========================================================================
# API Performance Checks
# =========================================================================
def _check_api_pagination(self, file_path: Path, content: str, lines: list[str]):
"""PERF-026: Check for missing pagination in list endpoints"""
# Look for GET endpoints that return lists
in_endpoint = False
endpoint_line = 0
has_pagination = False
for i, line in enumerate(lines, 1):
# Track router decorators
if re.search(r"@router\.(get|post)", line):
in_endpoint = True
endpoint_line = i
has_pagination = False
elif in_endpoint:
# Check for pagination parameters
if re.search(r"(skip|offset|page|limit)", line):
has_pagination = True
# Check for function end
if re.search(r"^def\s+\w+", line.lstrip()) and i > endpoint_line + 1:
in_endpoint = False
# Check for .all() without pagination
if ".all()" in line and not has_pagination:
if self._is_noqa_suppressed(line, "PERF-026"):
continue
self._add_violation(
rule_id="PERF-026",
rule_name="Pagination required for list endpoints",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="List endpoint may lack pagination",
context=line.strip()[:80],
suggestion="Add skip/limit parameters for pagination",
)
# =========================================================================
# Async Performance Checks
# =========================================================================
def _check_parallel_async(self, file_path: Path, content: str, lines: list[str]):
"""PERF-037: Check for sequential awaits that could be parallel"""
await_count = 0
await_lines = []
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("await "):
await_count += 1
await_lines.append(i)
# Check for 3+ sequential awaits
if await_count >= 3:
# Verify they're sequential (within 5 lines of each other)
if all(await_lines[j+1] - await_lines[j] <= 2 for j in range(len(await_lines)-1)):
if self._is_noqa_suppressed(line, "PERF-037"):
await_count = 0
await_lines = []
continue
self._add_violation(
rule_id="PERF-037",
rule_name="Parallel independent operations",
severity=Severity.INFO,
file_path=file_path,
line_number=await_lines[0],
message=f"{await_count} sequential awaits - consider asyncio.gather()",
context="Multiple await statements",
suggestion="Use asyncio.gather() for independent async operations",
)
await_count = 0
await_lines = []
elif stripped and not stripped.startswith("#"):
# Reset on non-await, non-empty line
if await_count > 0:
await_count = 0
await_lines = []
def _check_timeout_config(self, file_path: Path, content: str, lines: list[str]):
"""PERF-040: Check for missing timeouts on HTTP clients"""
if "requests" not in content and "httpx" not in content and "aiohttp" not in content:
return
patterns = [
r"requests\.(get|post|put|delete|patch)\s*\([^)]+\)",
r"httpx\.(get|post|put|delete|patch)\s*\([^)]+\)",
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line) and "timeout" not in line:
if self._is_noqa_suppressed(line, "PERF-040"):
continue
self._add_violation(
rule_id="PERF-040",
rule_name="Timeout configuration",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="HTTP request without timeout",
context=line.strip()[:80],
suggestion="Add timeout parameter to prevent hanging requests",
)
# =========================================================================
# Memory Performance Checks
# =========================================================================
def _check_generators(self, file_path: Path, content: str, lines: list[str]):
"""PERF-046: Check for loading large datasets into memory"""
for i, line in enumerate(lines, 1):
# Check for .all() followed by iteration
if ".all()" in line:
# Look ahead for iteration
if i < len(lines):
next_lines = "\n".join(lines[i:min(i+3, len(lines))])
if "for " in next_lines and "in" in next_lines:
if self._is_noqa_suppressed(line, "PERF-046"):
continue
self._add_violation(
rule_id="PERF-046",
rule_name="Generators for large datasets",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message=".all() loads everything into memory before iteration",
context=line.strip()[:80],
suggestion="Use .yield_per(100) for large result sets",
)
def _check_file_streaming(self, file_path: Path, content: str, lines: list[str]):
"""PERF-047: Check for loading entire files into memory"""
for i, line in enumerate(lines, 1):
if re.search(r"await\s+\w+\.read\(\)", line) and "chunk" not in line:
if self._is_noqa_suppressed(line, "PERF-047"):
continue
self._add_violation(
rule_id="PERF-047",
rule_name="Stream large file uploads",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Full file read into memory",
context=line.strip()[:80],
suggestion="Stream large files: while chunk := await file.read(8192)",
)
def _check_chunked_processing(self, file_path: Path, content: str, lines: list[str]):
"""PERF-048: Check for chunked processing in imports"""
if "chunk" not in content.lower() and "batch" not in content.lower():
# Check if file processes multiple records
if "for " in content and ("csv" in content.lower() or "import" in content.lower()):
first_line = lines[0] if lines else ""
if self._is_noqa_suppressed(first_line, "PERF-048"):
return
self._add_violation(
rule_id="PERF-048",
rule_name="Chunked processing for imports",
severity=Severity.INFO,
file_path=file_path,
line_number=1,
message="Import processing may benefit from chunking",
context="File processes multiple records",
suggestion="Process in chunks with periodic commits",
)
def _check_context_managers(self, file_path: Path, content: str, lines: list[str]):
"""PERF-049: Check for file handles without context managers"""
for i, line in enumerate(lines, 1):
# Check for file open without 'with'
if re.search(r"^\s*\w+\s*=\s*open\s*\(", line):
if self._is_noqa_suppressed(line, "PERF-049"):
continue
self._add_violation(
rule_id="PERF-049",
rule_name="Context managers for resources",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="File opened without context manager",
context=line.strip()[:80],
suggestion="Use 'with open(...) as f:' to ensure cleanup",
)
def _check_string_concatenation(self, file_path: Path, content: str, lines: list[str]):
"""PERF-051: Check for inefficient string concatenation in loops"""
in_for_loop = False
for_indent = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
if re.search(r"for\s+\w+\s+in\s+", line):
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif re.search(r'\w+\s*\+=\s*["\']|str\s*\(', line):
if self._is_noqa_suppressed(line, "PERF-051"):
continue
self._add_violation(
rule_id="PERF-051",
rule_name="String concatenation efficiency",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="String concatenation in loop",
context=line.strip()[:80],
suggestion="Use ''.join() or StringIO for many concatenations",
)
# =========================================================================
# Frontend Performance Checks
# =========================================================================
def _check_debounce(self, file_path: Path, content: str, lines: list[str]):
"""PERF-056: Check for search inputs without debounce"""
for i, line in enumerate(lines, 1):
if re.search(r'@(input|keyup)=".*search.*fetch', line, re.IGNORECASE):
if "debounce" not in content.lower():
if self._is_noqa_suppressed(line, "PERF-056"):
continue
self._add_violation(
rule_id="PERF-056",
rule_name="Debounce search inputs",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Search input triggers API call without debounce",
context=line.strip()[:80],
suggestion="Add 300-500ms debounce to prevent excessive API calls",
)
def _check_polling_intervals(self, file_path: Path, content: str, lines: list[str]):
"""PERF-062: Check for too-frequent polling"""
for i, line in enumerate(lines, 1):
match = re.search(r"setInterval\s*\([^,]+,\s*(\d+)\s*\)", line)
if match:
interval = int(match.group(1))
if interval < 10000: # Less than 10 seconds
if "# real-time" in line or self._is_noqa_suppressed(line, "PERF-062"):
continue
self._add_violation(
rule_id="PERF-062",
rule_name="Reasonable polling intervals",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message=f"Polling interval {interval}ms is very frequent",
context=line.strip()[:80],
suggestion="Use >= 10 second intervals for non-critical updates",
)
def _check_layout_thrashing(self, file_path: Path, content: str, lines: list[str]):
"""PERF-064: Check for layout thrashing patterns"""
for i, line in enumerate(lines, 1):
# Check for read then write patterns
if re.search(r"(offsetHeight|offsetWidth|clientHeight|clientWidth)", line):
if i < len(lines):
next_line = lines[i] if i < len(lines) else ""
if "style" in next_line:
if self._is_noqa_suppressed(line, "PERF-064"):
continue
self._add_violation(
rule_id="PERF-064",
rule_name="Avoid layout thrashing",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="DOM read followed by write can cause layout thrashing",
context=line.strip()[:80],
suggestion="Batch DOM reads, then batch DOM writes",
)
def _check_image_lazy_loading(self, file_path: Path, content: str, lines: list[str]):
"""PERF-058: Check for images without lazy loading"""
for i, line in enumerate(lines, 1):
if re.search(r"<img\s+[^>]*src=", line):
if 'loading="lazy"' not in line and "x-intersect" not in line:
if "logo" not in line.lower() and "icon" not in line.lower():
if self._is_noqa_suppressed(line, "PERF-058"):
continue
self._add_violation(
rule_id="PERF-058",
rule_name="Image optimization",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Image without lazy loading",
context=line.strip()[:80],
suggestion='Add loading="lazy" for off-screen images',
)
def _check_script_loading(self, file_path: Path, content: str, lines: list[str]):
"""PERF-067: Check for script tags without defer/async"""
for i, line in enumerate(lines, 1):
if re.search(r"<script\s+[^>]*src=", line):
if "defer" not in line and "async" not in line:
if "alpine" not in line.lower() and "htmx" not in line.lower():
if self._is_noqa_suppressed(line, "PERF-067"):
continue
self._add_violation(
rule_id="PERF-067",
rule_name="Defer non-critical JavaScript",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Script tag without defer/async",
context=line.strip()[:80],
suggestion="Add defer for non-critical scripts",
)
def main():
parser = argparse.ArgumentParser(
description="Performance code validator",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-f", "--file", type=Path, help="Validate a single file")
parser.add_argument("-d", "--folder", type=Path, help="Validate a directory")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--errors-only", action="store_true", help="Only show errors")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
validator = PerformanceValidator(verbose=args.verbose)
if args.file:
validator.validate_file(args.file)
elif args.folder:
validator.validate_all(args.folder)
else:
validator.validate_all()
validator.output_results(json_output=args.json, errors_only=args.errors_only)
sys.exit(validator.get_exit_code())
if __name__ == "__main__":
main()