#!/usr/bin/env python3 """ Performance Validator ===================== Validates code against performance rules defined in .performance-rules/ This script checks for common performance issues: - N+1 query patterns - Missing pagination - Inefficient database operations - Memory management issues - Frontend performance anti-patterns - Missing timeouts and connection pooling Usage: python scripts/validate/validate_performance.py # Check all files python scripts/validate/validate_performance.py -d app/services/ # Check specific directory python scripts/validate/validate_performance.py -f app/api/v1/products.py # Check single file python scripts/validate/validate_performance.py -v # Verbose output python scripts/validate/validate_performance.py --json # JSON output python scripts/validate/validate_performance.py --errors-only # Only show errors Options: -f, --file PATH Validate a single file -d, --folder PATH Validate all files in a directory (recursive) -v, --verbose Show detailed output including context --errors-only Only show errors, suppress warnings and info --json Output results as JSON """ import argparse import re import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) from base_validator import BaseValidator, Severity, ValidationResult class PerformanceValidator(BaseValidator): """Performance-focused code validator""" VALIDATOR_NAME = "Performance Validator" VALIDATOR_EMOJI = "⚡" RULES_DIR_NAME = ".performance-rules" CONFIG_FILE_NAME = ".performance-rules.yaml" def validate_all(self, target_path: Path = None) -> ValidationResult: """Validate all files for performance issues""" print(f"\n{self.VALIDATOR_EMOJI} Starting performance validation...\n") target = target_path or self.project_root # Validate Python files self._validate_python_files(target) # Validate JavaScript files self._validate_javascript_files(target) # Validate HTML templates self._validate_template_files(target) return self.result def _validate_python_files(self, target: Path): """Validate all Python files for performance issues""" print("🐍 Validating Python files...") for py_file in target.rglob("*.py"): if self._should_ignore_file(py_file): continue self.result.files_checked += 1 content = py_file.read_text() lines = content.split("\n") self._validate_python_performance(py_file, content, lines) def _validate_javascript_files(self, target: Path): """Validate all JavaScript files for performance issues""" print("🟨 Validating JavaScript files...") for js_file in target.rglob("*.js"): if self._should_ignore_file(js_file): continue self.result.files_checked += 1 content = js_file.read_text() lines = content.split("\n") self._validate_javascript_performance(js_file, content, lines) def _validate_template_files(self, target: Path): """Validate all HTML template files for performance issues""" print("📄 Validating template files...") for html_file in target.rglob("*.html"): if self._should_ignore_file(html_file): continue self.result.files_checked += 1 content = html_file.read_text() lines = content.split("\n") self._validate_template_performance(html_file, content, lines) def _validate_file_content(self, file_path: Path, content: str, lines: list[str]): """Validate file content based on file type""" if file_path.suffix == ".py": self._validate_python_performance(file_path, content, lines) elif file_path.suffix == ".js": self._validate_javascript_performance(file_path, content, lines) elif file_path.suffix == ".html": self._validate_template_performance(file_path, content, lines) def _validate_python_performance(self, file_path: Path, content: str, lines: list[str]): """Validate Python file for performance issues""" file_path_str = str(file_path) # PERF-001: N+1 query detection self._check_n_plus_1_queries(file_path, content, lines) # PERF-003: Query result limiting self._check_query_limiting(file_path, content, lines) # PERF-006: Bulk operations self._check_bulk_operations(file_path, content, lines) # PERF-008: Use EXISTS for existence checks self._check_existence_checks(file_path, content, lines) # PERF-009: Batch updates self._check_batch_updates(file_path, content, lines) # PERF-026: Pagination for API endpoints if "/api/" in file_path_str: self._check_api_pagination(file_path, content, lines) # PERF-037: Parallel async operations self._check_parallel_async(file_path, content, lines) # PERF-040: Timeout configuration self._check_timeout_config(file_path, content, lines) # PERF-046: Generators for large datasets self._check_generators(file_path, content, lines) # PERF-047: Stream file uploads if "upload" in file_path_str.lower() or "file" in file_path_str.lower(): self._check_file_streaming(file_path, content, lines) # PERF-048: Chunked processing if "import" in file_path_str.lower() or "csv" in file_path_str.lower(): self._check_chunked_processing(file_path, content, lines) # PERF-049: Context managers for files self._check_context_managers(file_path, content, lines) # PERF-051: String concatenation self._check_string_concatenation(file_path, content, lines) def _validate_javascript_performance(self, file_path: Path, content: str, lines: list[str]): """Validate JavaScript file for performance issues""" # PERF-056: Debounce search inputs self._check_debounce(file_path, content, lines) # PERF-062: Polling intervals self._check_polling_intervals(file_path, content, lines) # PERF-064: Layout thrashing self._check_layout_thrashing(file_path, content, lines) def _validate_template_performance(self, file_path: Path, content: str, lines: list[str]): """Validate HTML template file for performance issues""" # PERF-058: Image lazy loading self._check_image_lazy_loading(file_path, content, lines) # PERF-067: Script defer/async self._check_script_loading(file_path, content, lines) # ========================================================================= # Database Performance Checks # ========================================================================= def _check_n_plus_1_queries(self, file_path: Path, content: str, lines: list[str]): """PERF-001: Check for N+1 query patterns""" # Look for patterns like: for item in items: item.relationship.attribute in_for_loop = False for_line_num = 0 for i, line in enumerate(lines, 1): stripped = line.strip() # Track for loops over query results if re.search(r"for\s+\w+\s+in\s+.*\.(all|query)", line): in_for_loop = True for_line_num = i elif in_for_loop and stripped and not stripped.startswith("#"): # Check for relationship access in loop if re.search(r"\.\w+\.\w+", line) and "(" not in line: # Could be accessing a relationship if any(rel in line for rel in [".customer.", ".store.", ".order.", ".product.", ".user."]): if self._is_noqa_suppressed(line, "PERF-001"): continue self._add_violation( rule_id="PERF-001", rule_name="N+1 query detection", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Possible N+1 query - relationship accessed in loop", context=line.strip()[:80], suggestion="Use joinedload() or selectinload() for eager loading", ) in_for_loop = False # Reset on dedent if in_for_loop and line and not line.startswith(" " * 4) and i > for_line_num + 1: in_for_loop = False def _check_query_limiting(self, file_path: Path, content: str, lines: list[str]): """PERF-003: Check for unbounded query results""" for i, line in enumerate(lines, 1): if re.search(r"\.all\(\)", line): # Check if there's a limit or filter before context_start = max(0, i - 5) context_lines = lines[context_start:i] context_text = "\n".join(context_lines) if "limit" not in context_text.lower() and "filter" not in context_text.lower(): if self._is_noqa_suppressed(line, "PERF-003") or "# bounded" in line: continue self._add_violation( rule_id="PERF-003", rule_name="Query result limiting", severity=Severity.INFO, file_path=file_path, line_number=i, message="Query may return unbounded results", context=line.strip()[:80], suggestion="Add .limit() or pagination for large tables", ) def _check_bulk_operations(self, file_path: Path, content: str, lines: list[str]): """PERF-006: Check for individual operations in loops""" in_for_loop = False for_indent = 0 for i, line in enumerate(lines, 1): stripped = line.strip() # Track for loops if re.search(r"for\s+\w+\s+in\s+", line): in_for_loop = True for_indent = len(line) - len(line.lstrip()) elif in_for_loop: current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4 if current_indent <= for_indent and stripped: in_for_loop = False elif "db.add(" in line or ".save(" in line: if self._is_noqa_suppressed(line, "PERF-006"): continue self._add_violation( rule_id="PERF-006", rule_name="Bulk operations for multiple records", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Individual db.add() in loop - consider bulk operations", context=line.strip()[:80], suggestion="Use db.add_all() or bulk_insert_mappings()", ) def _check_existence_checks(self, file_path: Path, content: str, lines: list[str]): """PERF-008: Check for inefficient existence checks""" patterns = [ (r"\.count\(\)\s*>\s*0", "count() > 0"), (r"\.count\(\)\s*>=\s*1", "count() >= 1"), (r"\.count\(\)\s*!=\s*0", "count() != 0"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "PERF-008"): continue self._add_violation( rule_id="PERF-008", rule_name="Use EXISTS for existence checks", severity=Severity.INFO, file_path=file_path, line_number=i, message=f"{issue} scans all rows - use EXISTS instead", context=line.strip()[:80], suggestion="Use db.scalar(exists().where(...)) or .first() is not None", ) def _check_batch_updates(self, file_path: Path, content: str, lines: list[str]): """PERF-009: Check for updates in loops""" in_for_loop = False for_indent = 0 loop_var = "" for i, line in enumerate(lines, 1): stripped = line.strip() # Track for loops match = re.search(r"for\s+(\w+)\s+in\s+", line) if match: in_for_loop = True for_indent = len(line) - len(line.lstrip()) loop_var = match.group(1) elif in_for_loop: current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4 if current_indent <= for_indent and stripped: in_for_loop = False elif loop_var and f"{loop_var}." in line and "=" in line and "==" not in line: # Attribute assignment in loop if self._is_noqa_suppressed(line, "PERF-009"): continue self._add_violation( rule_id="PERF-009", rule_name="Batch updates instead of loops", severity=Severity.INFO, file_path=file_path, line_number=i, message="Individual updates in loop - consider batch update", context=line.strip()[:80], suggestion="Use .update({...}) with filters for batch updates", ) # ========================================================================= # API Performance Checks # ========================================================================= def _check_api_pagination(self, file_path: Path, content: str, lines: list[str]): """PERF-026: Check for missing pagination in list endpoints""" # Look for GET endpoints that return lists in_endpoint = False endpoint_line = 0 has_pagination = False for i, line in enumerate(lines, 1): # Track router decorators if re.search(r"@router\.(get|post)", line): in_endpoint = True endpoint_line = i has_pagination = False elif in_endpoint: # Check for pagination parameters if re.search(r"(skip|offset|page|limit)", line): has_pagination = True # Check for function end if re.search(r"^def\s+\w+", line.lstrip()) and i > endpoint_line + 1: in_endpoint = False # Check for .all() without pagination if ".all()" in line and not has_pagination: if self._is_noqa_suppressed(line, "PERF-026"): continue self._add_violation( rule_id="PERF-026", rule_name="Pagination required for list endpoints", severity=Severity.WARNING, file_path=file_path, line_number=i, message="List endpoint may lack pagination", context=line.strip()[:80], suggestion="Add skip/limit parameters for pagination", ) # ========================================================================= # Async Performance Checks # ========================================================================= def _check_parallel_async(self, file_path: Path, content: str, lines: list[str]): """PERF-037: Check for sequential awaits that could be parallel""" await_count = 0 await_lines = [] for i, line in enumerate(lines, 1): stripped = line.strip() if stripped.startswith("await "): await_count += 1 await_lines.append(i) # Check for 3+ sequential awaits if await_count >= 3: # Verify they're sequential (within 5 lines of each other) if all(await_lines[j+1] - await_lines[j] <= 2 for j in range(len(await_lines)-1)): if self._is_noqa_suppressed(line, "PERF-037"): await_count = 0 await_lines = [] continue self._add_violation( rule_id="PERF-037", rule_name="Parallel independent operations", severity=Severity.INFO, file_path=file_path, line_number=await_lines[0], message=f"{await_count} sequential awaits - consider asyncio.gather()", context="Multiple await statements", suggestion="Use asyncio.gather() for independent async operations", ) await_count = 0 await_lines = [] elif stripped and not stripped.startswith("#"): # Reset on non-await, non-empty line if await_count > 0: await_count = 0 await_lines = [] def _check_timeout_config(self, file_path: Path, content: str, lines: list[str]): """PERF-040: Check for missing timeouts on HTTP clients""" if "requests" not in content and "httpx" not in content and "aiohttp" not in content: return patterns = [ r"requests\.(get|post|put|delete|patch)\s*\([^)]+\)", r"httpx\.(get|post|put|delete|patch)\s*\([^)]+\)", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line) and "timeout" not in line: if self._is_noqa_suppressed(line, "PERF-040"): continue self._add_violation( rule_id="PERF-040", rule_name="Timeout configuration", severity=Severity.WARNING, file_path=file_path, line_number=i, message="HTTP request without timeout", context=line.strip()[:80], suggestion="Add timeout parameter to prevent hanging requests", ) # ========================================================================= # Memory Performance Checks # ========================================================================= def _check_generators(self, file_path: Path, content: str, lines: list[str]): """PERF-046: Check for loading large datasets into memory""" for i, line in enumerate(lines, 1): # Check for .all() followed by iteration if ".all()" in line: # Look ahead for iteration if i < len(lines): next_lines = "\n".join(lines[i:min(i+3, len(lines))]) if "for " in next_lines and "in" in next_lines: if self._is_noqa_suppressed(line, "PERF-046"): continue self._add_violation( rule_id="PERF-046", rule_name="Generators for large datasets", severity=Severity.INFO, file_path=file_path, line_number=i, message=".all() loads everything into memory before iteration", context=line.strip()[:80], suggestion="Use .yield_per(100) for large result sets", ) def _check_file_streaming(self, file_path: Path, content: str, lines: list[str]): """PERF-047: Check for loading entire files into memory""" for i, line in enumerate(lines, 1): if re.search(r"await\s+\w+\.read\(\)", line) and "chunk" not in line: if self._is_noqa_suppressed(line, "PERF-047"): continue self._add_violation( rule_id="PERF-047", rule_name="Stream large file uploads", severity=Severity.INFO, file_path=file_path, line_number=i, message="Full file read into memory", context=line.strip()[:80], suggestion="Stream large files: while chunk := await file.read(8192)", ) def _check_chunked_processing(self, file_path: Path, content: str, lines: list[str]): """PERF-048: Check for chunked processing in imports""" if "chunk" not in content.lower() and "batch" not in content.lower(): # Check if file processes multiple records if "for " in content and ("csv" in content.lower() or "import" in content.lower()): first_line = lines[0] if lines else "" if self._is_noqa_suppressed(first_line, "PERF-048"): return self._add_violation( rule_id="PERF-048", rule_name="Chunked processing for imports", severity=Severity.INFO, file_path=file_path, line_number=1, message="Import processing may benefit from chunking", context="File processes multiple records", suggestion="Process in chunks with periodic commits", ) def _check_context_managers(self, file_path: Path, content: str, lines: list[str]): """PERF-049: Check for file handles without context managers""" for i, line in enumerate(lines, 1): # Check for file open without 'with' if re.search(r"^\s*\w+\s*=\s*open\s*\(", line): if self._is_noqa_suppressed(line, "PERF-049"): continue self._add_violation( rule_id="PERF-049", rule_name="Context managers for resources", severity=Severity.WARNING, file_path=file_path, line_number=i, message="File opened without context manager", context=line.strip()[:80], suggestion="Use 'with open(...) as f:' to ensure cleanup", ) def _check_string_concatenation(self, file_path: Path, content: str, lines: list[str]): """PERF-051: Check for inefficient string concatenation in loops""" in_for_loop = False for_indent = 0 for i, line in enumerate(lines, 1): stripped = line.strip() if re.search(r"for\s+\w+\s+in\s+", line): in_for_loop = True for_indent = len(line) - len(line.lstrip()) elif in_for_loop: current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4 if current_indent <= for_indent and stripped: in_for_loop = False elif re.search(r'\w+\s*\+=\s*["\']|str\s*\(', line): if self._is_noqa_suppressed(line, "PERF-051"): continue self._add_violation( rule_id="PERF-051", rule_name="String concatenation efficiency", severity=Severity.INFO, file_path=file_path, line_number=i, message="String concatenation in loop", context=line.strip()[:80], suggestion="Use ''.join() or StringIO for many concatenations", ) # ========================================================================= # Frontend Performance Checks # ========================================================================= def _check_debounce(self, file_path: Path, content: str, lines: list[str]): """PERF-056: Check for search inputs without debounce""" for i, line in enumerate(lines, 1): if re.search(r'@(input|keyup)=".*search.*fetch', line, re.IGNORECASE): if "debounce" not in content.lower(): if self._is_noqa_suppressed(line, "PERF-056"): continue self._add_violation( rule_id="PERF-056", rule_name="Debounce search inputs", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Search input triggers API call without debounce", context=line.strip()[:80], suggestion="Add 300-500ms debounce to prevent excessive API calls", ) def _check_polling_intervals(self, file_path: Path, content: str, lines: list[str]): """PERF-062: Check for too-frequent polling""" for i, line in enumerate(lines, 1): match = re.search(r"setInterval\s*\([^,]+,\s*(\d+)\s*\)", line) if match: interval = int(match.group(1)) if interval < 10000: # Less than 10 seconds if "# real-time" in line or "// real-time" in line or self._is_noqa_suppressed(line, "PERF-062"): continue self._add_violation( rule_id="PERF-062", rule_name="Reasonable polling intervals", severity=Severity.WARNING, file_path=file_path, line_number=i, message=f"Polling interval {interval}ms is very frequent", context=line.strip()[:80], suggestion="Use >= 10 second intervals for non-critical updates", ) def _check_layout_thrashing(self, file_path: Path, content: str, lines: list[str]): """PERF-064: Check for layout thrashing patterns""" for i, line in enumerate(lines, 1): # Check for read then write patterns if re.search(r"(offsetHeight|offsetWidth|clientHeight|clientWidth)", line): if i < len(lines): next_line = lines[i] if i < len(lines) else "" if "style" in next_line: if self._is_noqa_suppressed(line, "PERF-064"): continue self._add_violation( rule_id="PERF-064", rule_name="Avoid layout thrashing", severity=Severity.INFO, file_path=file_path, line_number=i, message="DOM read followed by write can cause layout thrashing", context=line.strip()[:80], suggestion="Batch DOM reads, then batch DOM writes", ) def _check_image_lazy_loading(self, file_path: Path, content: str, lines: list[str]): """PERF-058: Check for images without lazy loading""" for i, line in enumerate(lines, 1): if re.search(r"]*src=", line): if 'loading="lazy"' not in line and "x-intersect" not in line: if "logo" not in line.lower() and "icon" not in line.lower(): if self._is_noqa_suppressed(line, "PERF-058"): continue self._add_violation( rule_id="PERF-058", rule_name="Image optimization", severity=Severity.INFO, file_path=file_path, line_number=i, message="Image without lazy loading", context=line.strip()[:80], suggestion='Add loading="lazy" for off-screen images', ) def _check_script_loading(self, file_path: Path, content: str, lines: list[str]): """PERF-067: Check for script tags without defer/async""" for i, line in enumerate(lines, 1): if re.search(r"]*src=", line): if "defer" not in line and "async" not in line: if "alpine" not in line.lower() and "htmx" not in line.lower(): if self._is_noqa_suppressed(line, "PERF-067"): continue self._add_violation( rule_id="PERF-067", rule_name="Defer non-critical JavaScript", severity=Severity.INFO, file_path=file_path, line_number=i, message="Script tag without defer/async", context=line.strip()[:80], suggestion="Add defer for non-critical scripts", ) def main(): parser = argparse.ArgumentParser( description="Performance code validator", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("-f", "--file", type=Path, help="Validate a single file") parser.add_argument("-d", "--folder", type=Path, help="Validate a directory") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("--errors-only", action="store_true", help="Only show errors") parser.add_argument("--json", action="store_true", help="JSON output") args = parser.parse_args() validator = PerformanceValidator(verbose=args.verbose) if args.file: validator.validate_file(args.file) elif args.folder: validator.validate_all(args.folder) else: validator.validate_all() validator.output_results(json_output=args.json, errors_only=args.errors_only) sys.exit(validator.get_exit_code()) if __name__ == "__main__": main()