feat: add unified code quality dashboard with multiple validators

- Add validator_type field to scans and violations (architecture,
  security, performance)
- Create security validator with SEC-xxx rules
- Create performance validator with PERF-xxx rules
- Add base validator class for shared functionality
- Add validate_all.py script to run all validators
- Update code quality service with validator type filtering
- Add validator type tabs to dashboard UI
- Add validator type filter to violations list
- Update stats response with per-validator breakdown
- Add security and performance rules documentation
- Add chat-bubble icons to icon library

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-21 20:57:47 +01:00
parent 6a903e16c6
commit 26b3dc9e3b
27 changed files with 5270 additions and 119 deletions

465
scripts/base_validator.py Executable file
View File

@@ -0,0 +1,465 @@
#!/usr/bin/env python3
"""
Base Validator
==============
Shared base class for all validation scripts (architecture, security, performance).
Provides common functionality for:
- Loading YAML configuration
- File pattern matching
- Violation tracking
- Output formatting (human-readable and JSON)
"""
import json
import re
import sys
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
import yaml
class Severity(Enum):
"""Validation severity levels"""
ERROR = "error"
WARNING = "warning"
INFO = "info"
@dataclass
class Violation:
"""Represents a rule violation"""
rule_id: str
rule_name: str
severity: Severity
file_path: Path
line_number: int
message: str
context: str = ""
suggestion: str = ""
@dataclass
class FileResult:
"""Results for a single file validation"""
file_path: Path
errors: int = 0
warnings: int = 0
info: int = 0
@property
def passed(self) -> bool:
return self.errors == 0
@property
def status(self) -> str:
if self.errors > 0:
return "FAILED"
if self.warnings > 0:
return "PASSED*"
return "PASSED"
@property
def status_icon(self) -> str:
if self.errors > 0:
return ""
if self.warnings > 0:
return "⚠️"
return ""
@dataclass
class ValidationResult:
"""Results of validation"""
violations: list[Violation] = field(default_factory=list)
files_checked: int = 0
rules_applied: int = 0
file_results: list[FileResult] = field(default_factory=list)
def has_errors(self) -> bool:
"""Check if there are any error-level violations"""
return any(v.severity == Severity.ERROR for v in self.violations)
def has_warnings(self) -> bool:
"""Check if there are any warning-level violations"""
return any(v.severity == Severity.WARNING for v in self.violations)
def error_count(self) -> int:
return sum(1 for v in self.violations if v.severity == Severity.ERROR)
def warning_count(self) -> int:
return sum(1 for v in self.violations if v.severity == Severity.WARNING)
def info_count(self) -> int:
return sum(1 for v in self.violations if v.severity == Severity.INFO)
class BaseValidator(ABC):
"""Abstract base validator class"""
# Subclasses should override these
VALIDATOR_NAME = "Base Validator"
VALIDATOR_EMOJI = "🔍"
RULES_DIR_NAME = ".rules"
CONFIG_FILE_NAME = ".rules.yaml"
def __init__(self, config_path: Path = None, verbose: bool = False):
"""Initialize validator with configuration"""
self.project_root = Path.cwd()
self.config_path = config_path or self.project_root / self.CONFIG_FILE_NAME
self.verbose = verbose
self.config = self._load_config()
self.result = ValidationResult()
def _load_config(self) -> dict[str, Any]:
"""
Load validation rules from YAML config.
Supports two modes:
1. Split directory mode: rules directory with multiple YAML files
2. Single file mode: single YAML file (legacy)
The split directory mode takes precedence if it exists.
"""
# Check for split directory mode first
rules_dir = self.project_root / self.RULES_DIR_NAME
if rules_dir.is_dir():
return self._load_config_from_directory(rules_dir)
# Fall back to single file mode
if not self.config_path.exists():
print(f"❌ Configuration file not found: {self.config_path}")
print(f" (Also checked for directory: {rules_dir})")
sys.exit(1)
with open(self.config_path) as f:
config = yaml.safe_load(f)
print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
return config
def _load_config_from_directory(self, rules_dir: Path) -> dict[str, Any]:
"""
Load and merge configuration from split YAML files in a directory.
Reads _main.yaml first for base config, then merges all other YAML files.
"""
config: dict[str, Any] = {}
# Load _main.yaml first (contains project info, principles, ignore patterns)
main_file = rules_dir / "_main.yaml"
if main_file.exists():
with open(main_file) as f:
config = yaml.safe_load(f) or {}
# Load all other YAML files and merge their contents
yaml_files = sorted(rules_dir.glob("*.yaml"))
for yaml_file in yaml_files:
if yaml_file.name == "_main.yaml":
continue # Already loaded
with open(yaml_file) as f:
file_config = yaml.safe_load(f) or {}
# Merge rule sections from this file into main config
for key, value in file_config.items():
if key.endswith("_rules") and isinstance(value, list):
# Merge rule lists
if key not in config:
config[key] = []
config[key].extend(value)
elif key not in config:
# Add new top-level keys
config[key] = value
print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
print(f" (from {len(yaml_files)} files in {rules_dir.name}/)")
return config
def _should_ignore_file(self, file_path: Path) -> bool:
"""Check if a file should be ignored based on config patterns"""
import fnmatch
ignore_config = self.config.get("ignore", {})
ignore_files = ignore_config.get("files", [])
# Get relative path for matching
try:
rel_path = file_path.relative_to(self.project_root)
except ValueError:
rel_path = file_path
rel_path_str = str(rel_path)
for pattern in ignore_files:
# Handle glob patterns using fnmatch
if "*" in pattern:
# fnmatch handles *, **, and ? patterns correctly
if fnmatch.fnmatch(rel_path_str, pattern):
return True
# Also check each path component for patterns like **/.venv/**
# This handles cases where the pattern expects any prefix
if pattern.startswith("**/"):
# Try matching without the **/ prefix (e.g., .venv/** matches .venv/foo)
suffix_pattern = pattern[3:] # Remove "**/""
if fnmatch.fnmatch(rel_path_str, suffix_pattern):
return True
elif pattern in rel_path_str:
return True
return False
def _add_violation(
self,
rule_id: str,
rule_name: str,
severity: Severity,
file_path: Path,
line_number: int,
message: str,
context: str = "",
suggestion: str = "",
):
"""Add a violation to the results"""
# Check for inline noqa comment
if f"noqa: {rule_id.lower()}" in context.lower():
return
self.result.violations.append(
Violation(
rule_id=rule_id,
rule_name=rule_name,
severity=severity,
file_path=file_path,
line_number=line_number,
message=message,
context=context,
suggestion=suggestion,
)
)
def _get_rule(self, rule_id: str) -> dict | None:
"""Look up a rule by ID across all rule categories"""
for key, value in self.config.items():
if key.endswith("_rules") and isinstance(value, list):
for rule in value:
if rule.get("id") == rule_id:
return rule
return None
def _check_pattern_in_file(
self,
file_path: Path,
content: str,
lines: list[str],
pattern: str,
rule_id: str,
rule_name: str,
severity: Severity,
message: str,
suggestion: str = "",
exclude_patterns: list[str] = None,
):
"""Check for a regex pattern in a file and report violations"""
exclude_patterns = exclude_patterns or []
for i, line in enumerate(lines, 1):
if re.search(pattern, line, re.IGNORECASE):
# Check exclusions
should_exclude = False
for exclude in exclude_patterns:
if exclude in line:
should_exclude = True
break
if not should_exclude:
self._add_violation(
rule_id=rule_id,
rule_name=rule_name,
severity=severity,
file_path=file_path,
line_number=i,
message=message,
context=line.strip()[:100],
suggestion=suggestion,
)
@abstractmethod
def validate_all(self, target_path: Path = None) -> ValidationResult:
"""Validate all files in a directory - must be implemented by subclasses"""
pass
def validate_file(self, file_path: Path, quiet: bool = False) -> ValidationResult:
"""Validate a single file"""
if not file_path.exists():
if not quiet:
print(f"❌ File not found: {file_path}")
return self.result
if not file_path.is_file():
if not quiet:
print(f"❌ Not a file: {file_path}")
return self.result
if not quiet:
print(f"\n{self.VALIDATOR_EMOJI} Validating single file: {file_path}\n")
# Resolve file path to absolute
file_path = file_path.resolve()
if self._should_ignore_file(file_path):
if not quiet:
print("⏭️ File is in ignore list, skipping")
return self.result
self.result.files_checked += 1
# Track violations before this file
violations_before = len(self.result.violations)
content = file_path.read_text()
lines = content.split("\n")
# Call subclass-specific validation
self._validate_file_content(file_path, content, lines)
# Calculate violations for this file
file_violations = self.result.violations[violations_before:]
errors = sum(1 for v in file_violations if v.severity == Severity.ERROR)
warnings = sum(1 for v in file_violations if v.severity == Severity.WARNING)
info = sum(1 for v in file_violations if v.severity == Severity.INFO)
# Track file result
self.result.file_results.append(
FileResult(file_path=file_path, errors=errors, warnings=warnings, info=info)
)
return self.result
@abstractmethod
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
"""Validate file content - must be implemented by subclasses"""
pass
def output_results(self, json_output: bool = False, errors_only: bool = False):
"""Output validation results"""
if json_output:
self._output_json()
else:
self._output_human(errors_only)
def _output_json(self):
"""Output results as JSON
Format matches code quality service expectations:
- file_path (not file)
- line_number (not line)
- total_violations count
"""
try:
rel_base = self.project_root
except Exception:
rel_base = Path.cwd()
def get_relative_path(file_path: Path) -> str:
"""Get relative path from project root"""
try:
return str(file_path.relative_to(rel_base))
except ValueError:
return str(file_path)
output = {
"validator": self.VALIDATOR_NAME,
"files_checked": self.result.files_checked,
"total_violations": len(self.result.violations),
"errors": self.result.error_count(),
"warnings": self.result.warning_count(),
"info": self.result.info_count(),
"violations": [
{
"rule_id": v.rule_id,
"rule_name": v.rule_name,
"severity": v.severity.value,
"file_path": get_relative_path(v.file_path),
"line_number": v.line_number,
"message": v.message,
"context": v.context,
"suggestion": v.suggestion,
}
for v in self.result.violations
],
}
print(json.dumps(output, indent=2))
def _output_human(self, errors_only: bool = False):
"""Output results in human-readable format"""
print("\n" + "=" * 80)
print(f"📊 {self.VALIDATOR_NAME.upper()} REPORT")
print("=" * 80)
errors = [v for v in self.result.violations if v.severity == Severity.ERROR]
warnings = [v for v in self.result.violations if v.severity == Severity.WARNING]
info = [v for v in self.result.violations if v.severity == Severity.INFO]
print(
f"\nFiles checked: {self.result.files_checked}"
)
print(
f"Findings: {len(errors)} errors, {len(warnings)} warnings, {len(info)} info"
)
if errors:
print(f"\n\n❌ ERRORS ({len(errors)}):")
print("-" * 80)
for v in errors:
self._print_violation(v)
if warnings and not errors_only:
print(f"\n\n⚠️ WARNINGS ({len(warnings)}):")
print("-" * 80)
for v in warnings:
self._print_violation(v)
if info and not errors_only:
print(f"\n INFO ({len(info)}):")
print("-" * 80)
for v in info:
self._print_violation(v)
print("\n" + "=" * 80)
if errors:
print("❌ VALIDATION FAILED")
elif warnings:
print(f"⚠️ VALIDATION PASSED WITH {len(warnings)} WARNING(S)")
else:
print("✅ VALIDATION PASSED")
print("=" * 80)
def _print_violation(self, v: Violation):
"""Print a single violation"""
try:
rel_path = v.file_path.relative_to(self.project_root)
except ValueError:
rel_path = v.file_path
print(f"\n [{v.rule_id}] {v.rule_name}")
print(f" File: {rel_path}:{v.line_number}")
print(f" Issue: {v.message}")
if v.context and self.verbose:
print(f" Context: {v.context}")
if v.suggestion:
print(f" 💡 Suggestion: {v.suggestion}")
def get_exit_code(self) -> int:
"""Get appropriate exit code based on results"""
if self.result.has_errors():
return 1
return 0

218
scripts/validate_all.py Executable file
View File

@@ -0,0 +1,218 @@
#!/usr/bin/env python3
"""
Unified Code Validator
======================
Runs all validation scripts (architecture, security, performance) in sequence.
This provides a single entry point for comprehensive code validation,
useful for CI/CD pipelines and pre-commit hooks.
Usage:
python scripts/validate_all.py # Run all validators
python scripts/validate_all.py --security # Run only security validator
python scripts/validate_all.py --performance # Run only performance validator
python scripts/validate_all.py --architecture # Run only architecture validator
python scripts/validate_all.py -v # Verbose output
python scripts/validate_all.py --fail-fast # Stop on first failure
python scripts/validate_all.py --json # JSON output
Options:
--architecture Run architecture validator
--security Run security validator
--performance Run performance validator
--fail-fast Stop on first validator failure
-v, --verbose Show detailed output
--errors-only Only show errors
--json Output results as JSON
"""
import argparse
import json
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_validator import Severity
def run_architecture_validator(verbose: bool = False) -> tuple[int, dict]:
"""Run the architecture validator"""
try:
# Import dynamically to avoid circular imports
sys.path.insert(0, str(Path(__file__).parent.parent))
from scripts.validate_architecture import ArchitectureValidator
config_path = Path.cwd() / ".architecture-rules.yaml"
validator = ArchitectureValidator(config_path=config_path, verbose=verbose)
result = validator.validate_all()
return (
1 if result.has_errors() else 0,
{
"name": "Architecture",
"files_checked": result.files_checked,
"errors": sum(1 for v in result.violations if v.severity.value == "error"),
"warnings": sum(1 for v in result.violations if v.severity.value == "warning"),
"info": sum(1 for v in result.violations if v.severity.value == "info"),
}
)
except ImportError as e:
print(f"⚠️ Architecture validator not available: {e}")
return 0, {"name": "Architecture", "skipped": True}
except Exception as e:
print(f"❌ Architecture validator failed: {e}")
return 1, {"name": "Architecture", "error": str(e)}
def run_security_validator(verbose: bool = False) -> tuple[int, dict]:
"""Run the security validator"""
try:
from validate_security import SecurityValidator
validator = SecurityValidator(verbose=verbose)
result = validator.validate_all()
return (
1 if result.has_errors() else 0,
{
"name": "Security",
"files_checked": result.files_checked,
"errors": result.error_count(),
"warnings": result.warning_count(),
"info": result.info_count(),
}
)
except ImportError as e:
print(f"⚠️ Security validator not available: {e}")
return 0, {"name": "Security", "skipped": True}
except Exception as e:
print(f"❌ Security validator failed: {e}")
return 1, {"name": "Security", "error": str(e)}
def run_performance_validator(verbose: bool = False) -> tuple[int, dict]:
"""Run the performance validator"""
try:
from validate_performance import PerformanceValidator
validator = PerformanceValidator(verbose=verbose)
result = validator.validate_all()
return (
1 if result.has_errors() else 0,
{
"name": "Performance",
"files_checked": result.files_checked,
"errors": result.error_count(),
"warnings": result.warning_count(),
"info": result.info_count(),
}
)
except ImportError as e:
print(f"⚠️ Performance validator not available: {e}")
return 0, {"name": "Performance", "skipped": True}
except Exception as e:
print(f"❌ Performance validator failed: {e}")
return 1, {"name": "Performance", "error": str(e)}
def print_summary(results: list[dict], json_output: bool = False):
"""Print validation summary"""
if json_output:
print(json.dumps({"validators": results}, indent=2))
return
print("\n" + "=" * 80)
print("📊 UNIFIED VALIDATION SUMMARY")
print("=" * 80)
total_errors = 0
total_warnings = 0
total_info = 0
for result in results:
if result.get("skipped"):
print(f"\n⏭️ {result['name']}: Skipped")
elif result.get("error"):
print(f"\n{result['name']}: Error - {result['error']}")
else:
errors = result.get("errors", 0)
warnings = result.get("warnings", 0)
info = result.get("info", 0)
total_errors += errors
total_warnings += warnings
total_info += info
status = "" if errors == 0 else ""
print(f"\n{status} {result['name']}:")
print(f" Files: {result.get('files_checked', 0)}")
print(f" Errors: {errors}, Warnings: {warnings}, Info: {info}")
print("\n" + "-" * 80)
print(f"TOTAL: {total_errors} errors, {total_warnings} warnings, {total_info} info")
print("=" * 80)
if total_errors > 0:
print("❌ VALIDATION FAILED")
elif total_warnings > 0:
print(f"⚠️ VALIDATION PASSED WITH {total_warnings} WARNING(S)")
else:
print("✅ VALIDATION PASSED")
print("=" * 80)
def main():
parser = argparse.ArgumentParser(
description="Unified code validator - runs architecture, security, and performance checks",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("--architecture", action="store_true", help="Run architecture validator")
parser.add_argument("--security", action="store_true", help="Run security validator")
parser.add_argument("--performance", action="store_true", help="Run performance validator")
parser.add_argument("--fail-fast", action="store_true", help="Stop on first failure")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--errors-only", action="store_true", help="Only show errors")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
# If no specific validators selected, run all
run_all = not (args.architecture or args.security or args.performance)
print("\n🔍 UNIFIED CODE VALIDATION")
print("=" * 80)
validators = []
if run_all or args.architecture:
validators.append(("Architecture", run_architecture_validator))
if run_all or args.security:
validators.append(("Security", run_security_validator))
if run_all or args.performance:
validators.append(("Performance", run_performance_validator))
results = []
exit_code = 0
for name, validator_func in validators:
print(f"\n{'=' * 40}")
print(f"🔍 Running {name} Validator...")
print("=" * 40)
code, result = validator_func(verbose=args.verbose)
results.append(result)
if code != 0:
exit_code = 1
if args.fail_fast:
print(f"\n{name} validator failed. Stopping (--fail-fast)")
break
print_summary(results, json_output=args.json)
sys.exit(exit_code)
if __name__ == "__main__":
main()

648
scripts/validate_performance.py Executable file
View File

@@ -0,0 +1,648 @@
#!/usr/bin/env python3
"""
Performance Validator
=====================
Validates code against performance rules defined in .performance-rules/
This script checks for common performance issues:
- N+1 query patterns
- Missing pagination
- Inefficient database operations
- Memory management issues
- Frontend performance anti-patterns
- Missing timeouts and connection pooling
Usage:
python scripts/validate_performance.py # Check all files
python scripts/validate_performance.py -d app/services/ # Check specific directory
python scripts/validate_performance.py -f app/api/v1/products.py # Check single file
python scripts/validate_performance.py -v # Verbose output
python scripts/validate_performance.py --json # JSON output
python scripts/validate_performance.py --errors-only # Only show errors
Options:
-f, --file PATH Validate a single file
-d, --folder PATH Validate all files in a directory (recursive)
-v, --verbose Show detailed output including context
--errors-only Only show errors, suppress warnings and info
--json Output results as JSON
"""
import argparse
import re
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_validator import BaseValidator, Severity, ValidationResult
class PerformanceValidator(BaseValidator):
"""Performance-focused code validator"""
VALIDATOR_NAME = "Performance Validator"
VALIDATOR_EMOJI = ""
RULES_DIR_NAME = ".performance-rules"
CONFIG_FILE_NAME = ".performance-rules.yaml"
def validate_all(self, target_path: Path = None) -> ValidationResult:
"""Validate all files for performance issues"""
print(f"\n{self.VALIDATOR_EMOJI} Starting performance validation...\n")
target = target_path or self.project_root
# Validate Python files
self._validate_python_files(target)
# Validate JavaScript files
self._validate_javascript_files(target)
# Validate HTML templates
self._validate_template_files(target)
return self.result
def _validate_python_files(self, target: Path):
"""Validate all Python files for performance issues"""
print("🐍 Validating Python files...")
for py_file in target.rglob("*.py"):
if self._should_ignore_file(py_file):
continue
self.result.files_checked += 1
content = py_file.read_text()
lines = content.split("\n")
self._validate_python_performance(py_file, content, lines)
def _validate_javascript_files(self, target: Path):
"""Validate all JavaScript files for performance issues"""
print("🟨 Validating JavaScript files...")
for js_file in target.rglob("*.js"):
if self._should_ignore_file(js_file):
continue
self.result.files_checked += 1
content = js_file.read_text()
lines = content.split("\n")
self._validate_javascript_performance(js_file, content, lines)
def _validate_template_files(self, target: Path):
"""Validate all HTML template files for performance issues"""
print("📄 Validating template files...")
for html_file in target.rglob("*.html"):
if self._should_ignore_file(html_file):
continue
self.result.files_checked += 1
content = html_file.read_text()
lines = content.split("\n")
self._validate_template_performance(html_file, content, lines)
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
"""Validate file content based on file type"""
if file_path.suffix == ".py":
self._validate_python_performance(file_path, content, lines)
elif file_path.suffix == ".js":
self._validate_javascript_performance(file_path, content, lines)
elif file_path.suffix == ".html":
self._validate_template_performance(file_path, content, lines)
def _validate_python_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate Python file for performance issues"""
file_path_str = str(file_path)
# PERF-001: N+1 query detection
self._check_n_plus_1_queries(file_path, content, lines)
# PERF-003: Query result limiting
self._check_query_limiting(file_path, content, lines)
# PERF-006: Bulk operations
self._check_bulk_operations(file_path, content, lines)
# PERF-008: Use EXISTS for existence checks
self._check_existence_checks(file_path, content, lines)
# PERF-009: Batch updates
self._check_batch_updates(file_path, content, lines)
# PERF-026: Pagination for API endpoints
if "/api/" in file_path_str:
self._check_api_pagination(file_path, content, lines)
# PERF-037: Parallel async operations
self._check_parallel_async(file_path, content, lines)
# PERF-040: Timeout configuration
self._check_timeout_config(file_path, content, lines)
# PERF-046: Generators for large datasets
self._check_generators(file_path, content, lines)
# PERF-047: Stream file uploads
if "upload" in file_path_str.lower() or "file" in file_path_str.lower():
self._check_file_streaming(file_path, content, lines)
# PERF-048: Chunked processing
if "import" in file_path_str.lower() or "csv" in file_path_str.lower():
self._check_chunked_processing(file_path, content, lines)
# PERF-049: Context managers for files
self._check_context_managers(file_path, content, lines)
# PERF-051: String concatenation
self._check_string_concatenation(file_path, content, lines)
def _validate_javascript_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate JavaScript file for performance issues"""
# PERF-056: Debounce search inputs
self._check_debounce(file_path, content, lines)
# PERF-062: Polling intervals
self._check_polling_intervals(file_path, content, lines)
# PERF-064: Layout thrashing
self._check_layout_thrashing(file_path, content, lines)
def _validate_template_performance(self, file_path: Path, content: str, lines: list[str]):
"""Validate HTML template file for performance issues"""
# PERF-058: Image lazy loading
self._check_image_lazy_loading(file_path, content, lines)
# PERF-067: Script defer/async
self._check_script_loading(file_path, content, lines)
# =========================================================================
# Database Performance Checks
# =========================================================================
def _check_n_plus_1_queries(self, file_path: Path, content: str, lines: list[str]):
"""PERF-001: Check for N+1 query patterns"""
# Look for patterns like: for item in items: item.relationship.attribute
in_for_loop = False
for_line_num = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops over query results
if re.search(r'for\s+\w+\s+in\s+.*\.(all|query)', line):
in_for_loop = True
for_line_num = i
elif in_for_loop and stripped and not stripped.startswith("#"):
# Check for relationship access in loop
if re.search(r'\.\w+\.\w+', line) and "(" not in line:
# Could be accessing a relationship
if any(rel in line for rel in [".customer.", ".vendor.", ".order.", ".product.", ".user."]):
self._add_violation(
rule_id="PERF-001",
rule_name="N+1 query detection",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Possible N+1 query - relationship accessed in loop",
context=line.strip()[:80],
suggestion="Use joinedload() or selectinload() for eager loading",
)
in_for_loop = False
# Reset on dedent
if in_for_loop and line and not line.startswith(" " * 4) and i > for_line_num + 1:
in_for_loop = False
def _check_query_limiting(self, file_path: Path, content: str, lines: list[str]):
"""PERF-003: Check for unbounded query results"""
for i, line in enumerate(lines, 1):
if re.search(r'\.all\(\)', line):
# Check if there's a limit or filter before
context_start = max(0, i - 5)
context_lines = lines[context_start:i]
context_text = "\n".join(context_lines)
if "limit" not in context_text.lower() and "filter" not in context_text.lower():
if "# noqa" in line or "# bounded" in line:
continue
self._add_violation(
rule_id="PERF-003",
rule_name="Query result limiting",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Query may return unbounded results",
context=line.strip()[:80],
suggestion="Add .limit() or pagination for large tables",
)
def _check_bulk_operations(self, file_path: Path, content: str, lines: list[str]):
"""PERF-006: Check for individual operations in loops"""
in_for_loop = False
for_indent = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops
if re.search(r'for\s+\w+\s+in\s+', line):
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif "db.add(" in line or ".save(" in line:
self._add_violation(
rule_id="PERF-006",
rule_name="Bulk operations for multiple records",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Individual db.add() in loop - consider bulk operations",
context=line.strip()[:80],
suggestion="Use db.add_all() or bulk_insert_mappings()",
)
def _check_existence_checks(self, file_path: Path, content: str, lines: list[str]):
"""PERF-008: Check for inefficient existence checks"""
patterns = [
(r'\.count\(\)\s*>\s*0', "count() > 0"),
(r'\.count\(\)\s*>=\s*1', "count() >= 1"),
(r'\.count\(\)\s*!=\s*0', "count() != 0"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line):
self._add_violation(
rule_id="PERF-008",
rule_name="Use EXISTS for existence checks",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message=f"{issue} scans all rows - use EXISTS instead",
context=line.strip()[:80],
suggestion="Use db.scalar(exists().where(...)) or .first() is not None",
)
def _check_batch_updates(self, file_path: Path, content: str, lines: list[str]):
"""PERF-009: Check for updates in loops"""
in_for_loop = False
for_indent = 0
loop_var = ""
for i, line in enumerate(lines, 1):
stripped = line.strip()
# Track for loops
match = re.search(r'for\s+(\w+)\s+in\s+', line)
if match:
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
loop_var = match.group(1)
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif loop_var and f"{loop_var}." in line and "=" in line and "==" not in line:
# Attribute assignment in loop
if "# noqa" not in line:
self._add_violation(
rule_id="PERF-009",
rule_name="Batch updates instead of loops",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Individual updates in loop - consider batch update",
context=line.strip()[:80],
suggestion="Use .update({...}) with filters for batch updates",
)
# =========================================================================
# API Performance Checks
# =========================================================================
def _check_api_pagination(self, file_path: Path, content: str, lines: list[str]):
"""PERF-026: Check for missing pagination in list endpoints"""
# Look for GET endpoints that return lists
in_endpoint = False
endpoint_line = 0
has_pagination = False
for i, line in enumerate(lines, 1):
# Track router decorators
if re.search(r'@router\.(get|post)', line):
in_endpoint = True
endpoint_line = i
has_pagination = False
elif in_endpoint:
# Check for pagination parameters
if re.search(r'(skip|offset|page|limit)', line):
has_pagination = True
# Check for function end
if re.search(r'^def\s+\w+', line.lstrip()) and i > endpoint_line + 1:
in_endpoint = False
# Check for .all() without pagination
if ".all()" in line and not has_pagination:
if "# noqa" not in line:
self._add_violation(
rule_id="PERF-026",
rule_name="Pagination required for list endpoints",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="List endpoint may lack pagination",
context=line.strip()[:80],
suggestion="Add skip/limit parameters for pagination",
)
# =========================================================================
# Async Performance Checks
# =========================================================================
def _check_parallel_async(self, file_path: Path, content: str, lines: list[str]):
"""PERF-037: Check for sequential awaits that could be parallel"""
await_count = 0
await_lines = []
for i, line in enumerate(lines, 1):
stripped = line.strip()
if stripped.startswith("await "):
await_count += 1
await_lines.append(i)
# Check for 3+ sequential awaits
if await_count >= 3:
# Verify they're sequential (within 5 lines of each other)
if all(await_lines[j+1] - await_lines[j] <= 2 for j in range(len(await_lines)-1)):
self._add_violation(
rule_id="PERF-037",
rule_name="Parallel independent operations",
severity=Severity.INFO,
file_path=file_path,
line_number=await_lines[0],
message=f"{await_count} sequential awaits - consider asyncio.gather()",
context="Multiple await statements",
suggestion="Use asyncio.gather() for independent async operations",
)
await_count = 0
await_lines = []
elif stripped and not stripped.startswith("#"):
# Reset on non-await, non-empty line
if await_count > 0:
await_count = 0
await_lines = []
def _check_timeout_config(self, file_path: Path, content: str, lines: list[str]):
"""PERF-040: Check for missing timeouts on HTTP clients"""
if "requests" not in content and "httpx" not in content and "aiohttp" not in content:
return
patterns = [
r'requests\.(get|post|put|delete|patch)\s*\([^)]+\)',
r'httpx\.(get|post|put|delete|patch)\s*\([^)]+\)',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line) and "timeout" not in line:
self._add_violation(
rule_id="PERF-040",
rule_name="Timeout configuration",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="HTTP request without timeout",
context=line.strip()[:80],
suggestion="Add timeout parameter to prevent hanging requests",
)
# =========================================================================
# Memory Performance Checks
# =========================================================================
def _check_generators(self, file_path: Path, content: str, lines: list[str]):
"""PERF-046: Check for loading large datasets into memory"""
for i, line in enumerate(lines, 1):
# Check for .all() followed by iteration
if ".all()" in line:
# Look ahead for iteration
if i < len(lines):
next_lines = "\n".join(lines[i:min(i+3, len(lines))])
if "for " in next_lines and "in" in next_lines:
if "# noqa" not in line:
self._add_violation(
rule_id="PERF-046",
rule_name="Generators for large datasets",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message=".all() loads everything into memory before iteration",
context=line.strip()[:80],
suggestion="Use .yield_per(100) for large result sets",
)
def _check_file_streaming(self, file_path: Path, content: str, lines: list[str]):
"""PERF-047: Check for loading entire files into memory"""
for i, line in enumerate(lines, 1):
if re.search(r'await\s+\w+\.read\(\)', line) and "chunk" not in line:
self._add_violation(
rule_id="PERF-047",
rule_name="Stream large file uploads",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Full file read into memory",
context=line.strip()[:80],
suggestion="Stream large files: while chunk := await file.read(8192)",
)
def _check_chunked_processing(self, file_path: Path, content: str, lines: list[str]):
"""PERF-048: Check for chunked processing in imports"""
if "chunk" not in content.lower() and "batch" not in content.lower():
# Check if file processes multiple records
if "for " in content and ("csv" in content.lower() or "import" in content.lower()):
self._add_violation(
rule_id="PERF-048",
rule_name="Chunked processing for imports",
severity=Severity.INFO,
file_path=file_path,
line_number=1,
message="Import processing may benefit from chunking",
context="File processes multiple records",
suggestion="Process in chunks with periodic commits",
)
def _check_context_managers(self, file_path: Path, content: str, lines: list[str]):
"""PERF-049: Check for file handles without context managers"""
for i, line in enumerate(lines, 1):
# Check for file open without 'with'
if re.search(r'^\s*\w+\s*=\s*open\s*\(', line):
if "# noqa" not in line:
self._add_violation(
rule_id="PERF-049",
rule_name="Context managers for resources",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="File opened without context manager",
context=line.strip()[:80],
suggestion="Use 'with open(...) as f:' to ensure cleanup",
)
def _check_string_concatenation(self, file_path: Path, content: str, lines: list[str]):
"""PERF-051: Check for inefficient string concatenation in loops"""
in_for_loop = False
for_indent = 0
for i, line in enumerate(lines, 1):
stripped = line.strip()
if re.search(r'for\s+\w+\s+in\s+', line):
in_for_loop = True
for_indent = len(line) - len(line.lstrip())
elif in_for_loop:
current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
if current_indent <= for_indent and stripped:
in_for_loop = False
elif re.search(r'\w+\s*\+=\s*["\']|str\s*\(', line):
if "# noqa" not in line:
self._add_violation(
rule_id="PERF-051",
rule_name="String concatenation efficiency",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="String concatenation in loop",
context=line.strip()[:80],
suggestion="Use ''.join() or StringIO for many concatenations",
)
# =========================================================================
# Frontend Performance Checks
# =========================================================================
def _check_debounce(self, file_path: Path, content: str, lines: list[str]):
"""PERF-056: Check for search inputs without debounce"""
for i, line in enumerate(lines, 1):
if re.search(r'@(input|keyup)=".*search.*fetch', line, re.IGNORECASE):
if "debounce" not in content.lower():
self._add_violation(
rule_id="PERF-056",
rule_name="Debounce search inputs",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Search input triggers API call without debounce",
context=line.strip()[:80],
suggestion="Add 300-500ms debounce to prevent excessive API calls",
)
def _check_polling_intervals(self, file_path: Path, content: str, lines: list[str]):
"""PERF-062: Check for too-frequent polling"""
for i, line in enumerate(lines, 1):
match = re.search(r'setInterval\s*\([^,]+,\s*(\d+)\s*\)', line)
if match:
interval = int(match.group(1))
if interval < 10000: # Less than 10 seconds
if "# real-time" not in line and "# noqa" not in line:
self._add_violation(
rule_id="PERF-062",
rule_name="Reasonable polling intervals",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message=f"Polling interval {interval}ms is very frequent",
context=line.strip()[:80],
suggestion="Use >= 10 second intervals for non-critical updates",
)
def _check_layout_thrashing(self, file_path: Path, content: str, lines: list[str]):
"""PERF-064: Check for layout thrashing patterns"""
for i, line in enumerate(lines, 1):
# Check for read then write patterns
if re.search(r'(offsetHeight|offsetWidth|clientHeight|clientWidth)', line):
if i < len(lines):
next_line = lines[i] if i < len(lines) else ""
if "style" in next_line:
self._add_violation(
rule_id="PERF-064",
rule_name="Avoid layout thrashing",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="DOM read followed by write can cause layout thrashing",
context=line.strip()[:80],
suggestion="Batch DOM reads, then batch DOM writes",
)
def _check_image_lazy_loading(self, file_path: Path, content: str, lines: list[str]):
"""PERF-058: Check for images without lazy loading"""
for i, line in enumerate(lines, 1):
if re.search(r'<img\s+[^>]*src=', line):
if 'loading="lazy"' not in line and "x-intersect" not in line:
if "logo" not in line.lower() and "icon" not in line.lower():
self._add_violation(
rule_id="PERF-058",
rule_name="Image optimization",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Image without lazy loading",
context=line.strip()[:80],
suggestion='Add loading="lazy" for off-screen images',
)
def _check_script_loading(self, file_path: Path, content: str, lines: list[str]):
"""PERF-067: Check for script tags without defer/async"""
for i, line in enumerate(lines, 1):
if re.search(r'<script\s+[^>]*src=', line):
if "defer" not in line and "async" not in line:
if "alpine" not in line.lower() and "htmx" not in line.lower():
self._add_violation(
rule_id="PERF-067",
rule_name="Defer non-critical JavaScript",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="Script tag without defer/async",
context=line.strip()[:80],
suggestion="Add defer for non-critical scripts",
)
def main():
parser = argparse.ArgumentParser(
description="Performance code validator",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-f", "--file", type=Path, help="Validate a single file")
parser.add_argument("-d", "--folder", type=Path, help="Validate a directory")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--errors-only", action="store_true", help="Only show errors")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
validator = PerformanceValidator(verbose=args.verbose)
if args.file:
validator.validate_file(args.file)
elif args.folder:
validator.validate_all(args.folder)
else:
validator.validate_all()
validator.output_results(json_output=args.json, errors_only=args.errors_only)
sys.exit(validator.get_exit_code())
if __name__ == "__main__":
main()

718
scripts/validate_security.py Executable file
View File

@@ -0,0 +1,718 @@
#!/usr/bin/env python3
"""
Security Validator
==================
Validates code against security rules defined in .security-rules/
This script checks for common security vulnerabilities:
- Hardcoded credentials and secrets
- SQL injection patterns
- Command injection risks
- XSS vulnerabilities
- Insecure cryptography
- Authentication weaknesses
- Data exposure risks
Usage:
python scripts/validate_security.py # Check all files
python scripts/validate_security.py -d app/api/ # Check specific directory
python scripts/validate_security.py -f app/api/v1/auth.py # Check single file
python scripts/validate_security.py -v # Verbose output
python scripts/validate_security.py --json # JSON output
python scripts/validate_security.py --errors-only # Only show errors
Options:
-f, --file PATH Validate a single file
-d, --folder PATH Validate all files in a directory (recursive)
-v, --verbose Show detailed output including context
--errors-only Only show errors, suppress warnings and info
--json Output results as JSON
"""
import argparse
import re
import sys
from pathlib import Path
# Add parent directory to path for imports
sys.path.insert(0, str(Path(__file__).parent))
from base_validator import BaseValidator, Severity, ValidationResult
class SecurityValidator(BaseValidator):
"""Security-focused code validator"""
VALIDATOR_NAME = "Security Validator"
VALIDATOR_EMOJI = "🔒"
RULES_DIR_NAME = ".security-rules"
CONFIG_FILE_NAME = ".security-rules.yaml"
def validate_all(self, target_path: Path = None) -> ValidationResult:
"""Validate all files for security issues"""
print(f"\n{self.VALIDATOR_EMOJI} Starting security validation...\n")
target = target_path or self.project_root
# Validate Python files
self._validate_python_files(target)
# Validate JavaScript files
self._validate_javascript_files(target)
# Validate HTML templates
self._validate_template_files(target)
# Validate configuration files
self._validate_config_files(target)
return self.result
def _validate_python_files(self, target: Path):
"""Validate all Python files for security issues"""
print("🐍 Validating Python files...")
for py_file in target.rglob("*.py"):
if self._should_ignore_file(py_file):
continue
self.result.files_checked += 1
content = py_file.read_text()
lines = content.split("\n")
self._validate_python_security(py_file, content, lines)
def _validate_javascript_files(self, target: Path):
"""Validate all JavaScript files for security issues"""
print("🟨 Validating JavaScript files...")
for js_file in target.rglob("*.js"):
if self._should_ignore_file(js_file):
continue
self.result.files_checked += 1
content = js_file.read_text()
lines = content.split("\n")
self._validate_javascript_security(js_file, content, lines)
def _validate_template_files(self, target: Path):
"""Validate all HTML template files for security issues"""
print("📄 Validating template files...")
for html_file in target.rglob("*.html"):
if self._should_ignore_file(html_file):
continue
self.result.files_checked += 1
content = html_file.read_text()
lines = content.split("\n")
self._validate_template_security(html_file, content, lines)
def _validate_config_files(self, target: Path):
"""Validate configuration files for security issues"""
print("⚙️ Validating configuration files...")
config_patterns = ["*.yaml", "*.yml", "*.json", "*.toml", "*.ini", "*.env*"]
for pattern in config_patterns:
for config_file in target.rglob(pattern):
if self._should_ignore_file(config_file):
continue
if config_file.suffix in [".yaml", ".yml", ".json"]:
self.result.files_checked += 1
content = config_file.read_text()
lines = content.split("\n")
self._validate_config_security(config_file, content, lines)
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
"""Validate file content based on file type"""
if file_path.suffix == ".py":
self._validate_python_security(file_path, content, lines)
elif file_path.suffix == ".js":
self._validate_javascript_security(file_path, content, lines)
elif file_path.suffix == ".html":
self._validate_template_security(file_path, content, lines)
elif file_path.suffix in [".yaml", ".yml", ".json"]:
self._validate_config_security(file_path, content, lines)
def _validate_python_security(self, file_path: Path, content: str, lines: list[str]):
"""Validate Python file for security issues"""
file_path_str = str(file_path)
# SEC-001: Hardcoded credentials
self._check_hardcoded_credentials(file_path, content, lines)
# SEC-011: SQL injection
self._check_sql_injection(file_path, content, lines)
# SEC-012: Command injection
self._check_command_injection(file_path, content, lines)
# SEC-013: Code execution
self._check_code_execution(file_path, content, lines)
# SEC-014: Path traversal
if "upload" in file_path_str.lower() or "file" in file_path_str.lower():
self._check_path_traversal(file_path, content, lines)
# SEC-020: Unsafe deserialization
self._check_unsafe_deserialization(file_path, content, lines)
# SEC-021: PII logging
self._check_pii_logging(file_path, content, lines)
# SEC-024: Error information leakage
self._check_error_leakage(file_path, content, lines)
# SEC-034: HTTPS enforcement
self._check_https_enforcement(file_path, content, lines)
# SEC-040: Timeout configuration
self._check_timeout_configuration(file_path, content, lines)
# SEC-041: Weak hashing
self._check_weak_hashing(file_path, content, lines)
# SEC-042: Insecure random
self._check_insecure_random(file_path, content, lines)
# SEC-043: Hardcoded encryption keys
self._check_hardcoded_keys(file_path, content, lines)
# SEC-047: Certificate verification
self._check_certificate_verification(file_path, content, lines)
# Auth file specific checks
if "auth" in file_path_str.lower():
self._check_jwt_expiry(file_path, content, lines)
def _validate_javascript_security(self, file_path: Path, content: str, lines: list[str]):
"""Validate JavaScript file for security issues"""
# SEC-022: Sensitive data in URLs
self._check_sensitive_url_params_js(file_path, content, lines)
# Check for eval usage
for i, line in enumerate(lines, 1):
if re.search(r'\beval\s*\(', line) and "//" not in line.split("eval")[0]:
self._add_violation(
rule_id="SEC-013",
rule_name="No code execution",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message="eval() allows arbitrary code execution",
context=line.strip()[:80],
suggestion="Use JSON.parse() for JSON or other safe alternatives",
)
# Check for innerHTML with user input
for i, line in enumerate(lines, 1):
if re.search(r'\.innerHTML\s*=', line) and "//" not in line.split("innerHTML")[0]:
self._add_violation(
rule_id="SEC-015",
rule_name="XSS prevention",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="innerHTML can lead to XSS if used with untrusted input",
context=line.strip()[:80],
suggestion="Use textContent for text or sanitize HTML input",
)
def _validate_template_security(self, file_path: Path, content: str, lines: list[str]):
"""Validate HTML template file for security issues"""
# SEC-015: XSS via |safe filter
for i, line in enumerate(lines, 1):
if re.search(r'\|\s*safe(?!\s*[{#].*sanitized)', line):
self._add_violation(
rule_id="SEC-015",
rule_name="XSS prevention in templates",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="|safe filter disables auto-escaping - ensure content is sanitized",
context=line.strip()[:80],
suggestion="Mark with {# sanitized #} comment if content is sanitized",
)
# Check for x-html with dynamic content
for i, line in enumerate(lines, 1):
if re.search(r'x-html="[^"]*\w', line) and "sanitized" not in line.lower():
self._add_violation(
rule_id="SEC-015",
rule_name="XSS prevention in templates",
severity=Severity.INFO,
file_path=file_path,
line_number=i,
message="x-html renders raw HTML - ensure content is safe",
context=line.strip()[:80],
suggestion="Use x-text for text content or sanitize HTML",
)
def _validate_config_security(self, file_path: Path, content: str, lines: list[str]):
"""Validate configuration file for security issues"""
# Check for hardcoded secrets in config
secret_patterns = [
(r'password\s*[=:]\s*["\'][^"\']{4,}["\']', "password"),
(r'secret\s*[=:]\s*["\'][^"\']{8,}["\']', "secret"),
(r'api_key\s*[=:]\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"),
(r'token\s*[=:]\s*["\'][A-Za-z0-9._-]{20,}["\']', "token"),
]
for i, line in enumerate(lines, 1):
# Skip comments
stripped = line.strip()
if stripped.startswith("#") or stripped.startswith("//"):
continue
for pattern, secret_type in secret_patterns:
if re.search(pattern, line, re.IGNORECASE):
# Check for environment variable references
if "${" in line or "os.getenv" in line or "environ" in line:
continue
self._add_violation(
rule_id="SEC-001",
rule_name="No hardcoded credentials",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"Possible hardcoded {secret_type} in configuration",
context=line.strip()[:60] + "...",
suggestion="Use environment variables for secrets",
)
# =========================================================================
# Specific Security Checks
# =========================================================================
def _check_hardcoded_credentials(self, file_path: Path, content: str, lines: list[str]):
"""SEC-001: Check for hardcoded credentials"""
patterns = [
(r'password\s*=\s*["\'][^"\']{4,}["\']', "password"),
(r'api_key\s*=\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"),
(r'secret_key\s*=\s*["\'][^"\']{8,}["\']', "secret key"),
(r'auth_token\s*=\s*["\'][A-Za-z0-9._-]{20,}["\']', "auth token"),
(r'AWS_SECRET.*=\s*["\'][^"\']+["\']', "AWS secret"),
(r'STRIPE_.*KEY.*=\s*["\'][^"\']+["\']', "Stripe key"),
]
exclude_patterns = [
"os.getenv", "os.environ", "settings.", '""', "''",
"# noqa", "# test", "password_hash", "example"
]
for i, line in enumerate(lines, 1):
for pattern, secret_type in patterns:
if re.search(pattern, line, re.IGNORECASE):
# Check exclusions
if any(exc in line for exc in exclude_patterns):
continue
self._add_violation(
rule_id="SEC-001",
rule_name="No hardcoded credentials",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"Possible hardcoded {secret_type}",
context=line.strip()[:60] + "...",
suggestion="Use environment variables or secret management",
)
def _check_sql_injection(self, file_path: Path, content: str, lines: list[str]):
"""SEC-011: Check for SQL injection vulnerabilities"""
patterns = [
r'execute\s*\(\s*f["\']',
r'execute\s*\([^)]*\s*\+\s*',
r'execute\s*\([^)]*%[^)]*%',
r'text\s*\(\s*f["\']',
r'\.raw\s*\(\s*f["\']',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line):
if "# noqa" in line or "# safe" in line:
continue
self._add_violation(
rule_id="SEC-011",
rule_name="No raw SQL queries",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message="Possible SQL injection - use parameterized queries",
context=line.strip()[:80],
suggestion="Use SQLAlchemy ORM or parameterized queries with :param syntax",
)
def _check_command_injection(self, file_path: Path, content: str, lines: list[str]):
"""SEC-012: Check for command injection vulnerabilities"""
patterns = [
(r'subprocess.*shell\s*=\s*True', "shell=True in subprocess"),
(r'os\.system\s*\(', "os.system()"),
(r'os\.popen\s*\(', "os.popen()"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line):
if "# noqa" in line or "# safe" in line:
continue
self._add_violation(
rule_id="SEC-012",
rule_name="No shell command injection",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"{issue} allows command injection",
context=line.strip()[:80],
suggestion="Use subprocess with list arguments, shell=False",
)
def _check_code_execution(self, file_path: Path, content: str, lines: list[str]):
"""SEC-013: Check for code execution vulnerabilities"""
patterns = [
(r'eval\s*\([^)]*request', "eval with request data"),
(r'eval\s*\([^)]*input', "eval with user input"),
(r'exec\s*\([^)]*request', "exec with request data"),
(r'__import__\s*\([^)]*request', "__import__ with request data"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line, re.IGNORECASE):
self._add_violation(
rule_id="SEC-013",
rule_name="No code execution",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"{issue} allows arbitrary code execution",
context=line.strip()[:80],
suggestion="Never use eval/exec with user input",
)
def _check_path_traversal(self, file_path: Path, content: str, lines: list[str]):
"""SEC-014: Check for path traversal vulnerabilities"""
# Check if file has path operations with user input
has_secure_filename = "secure_filename" in content or "basename" in content
patterns = [
r'open\s*\([^)]*request',
r'open\s*\([^)]*\+',
r'Path\s*\([^)]*request',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line, re.IGNORECASE):
if has_secure_filename:
continue
self._add_violation(
rule_id="SEC-014",
rule_name="Path traversal prevention",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Possible path traversal - validate file paths",
context=line.strip()[:80],
suggestion="Use secure_filename() and validate paths against allowed directories",
)
def _check_unsafe_deserialization(self, file_path: Path, content: str, lines: list[str]):
"""SEC-020: Check for unsafe deserialization"""
patterns = [
(r'pickle\.loads?\s*\(', "pickle deserialization"),
(r'yaml\.load\s*\([^,)]+\)(?!.*SafeLoader)', "yaml.load without SafeLoader"),
(r'marshal\.loads?\s*\(', "marshal deserialization"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line):
if "# noqa" in line:
continue
self._add_violation(
rule_id="SEC-020",
rule_name="Deserialization safety",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"Unsafe {issue} can lead to code execution",
context=line.strip()[:80],
suggestion="Use json.loads() or yaml.safe_load() instead",
)
def _check_pii_logging(self, file_path: Path, content: str, lines: list[str]):
"""SEC-021: Check for PII in logs"""
patterns = [
(r'log\w*\.[a-z]+\([^)]*password', "password in log"),
(r'log\w*\.[a-z]+\([^)]*credit_card', "credit card in log"),
(r'log\w*\.[a-z]+\([^)]*ssn', "SSN in log"),
(r'print\s*\([^)]*password', "password in print"),
]
exclude = ["password_hash", "password_reset", "password_changed", "# noqa"]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line, re.IGNORECASE):
if any(exc in line for exc in exclude):
continue
self._add_violation(
rule_id="SEC-021",
rule_name="PII logging prevention",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"Possible {issue}",
context=line.strip()[:60] + "...",
suggestion="Never log sensitive data - redact or omit",
)
def _check_error_leakage(self, file_path: Path, content: str, lines: list[str]):
"""SEC-024: Check for error information leakage"""
patterns = [
r'traceback\.format_exc\(\).*detail',
r'traceback\.format_exc\(\).*response',
r'str\(e\).*HTTPException',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line):
if "logger" in line or "# noqa" in line:
continue
self._add_violation(
rule_id="SEC-024",
rule_name="Error message information leakage",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="Internal error details may be exposed to users",
context=line.strip()[:80],
suggestion="Log errors internally, return generic message to users",
)
def _check_https_enforcement(self, file_path: Path, content: str, lines: list[str]):
"""SEC-034: Check for HTTP instead of HTTPS"""
for i, line in enumerate(lines, 1):
if re.search(r'http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0|\$)', line):
if "# noqa" in line or "example.com" in line or "schemas" in line:
continue
if "http://www.w3.org" in line:
continue
self._add_violation(
rule_id="SEC-034",
rule_name="HTTPS enforcement",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="HTTP URL found - use HTTPS for security",
context=line.strip()[:80],
suggestion="Replace http:// with https://",
)
def _check_timeout_configuration(self, file_path: Path, content: str, lines: list[str]):
"""SEC-040: Check for missing timeouts on external calls"""
# Check for requests/httpx calls without timeout
if "requests" in content or "httpx" in content or "aiohttp" in content:
has_timeout_import = "timeout" in content.lower()
patterns = [
r'requests\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)',
r'httpx\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line) and "timeout" not in line:
self._add_violation(
rule_id="SEC-040",
rule_name="Timeout configuration",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="HTTP request without timeout - can hang indefinitely",
context=line.strip()[:80],
suggestion="Add timeout parameter: requests.get(url, timeout=30)",
)
def _check_weak_hashing(self, file_path: Path, content: str, lines: list[str]):
"""SEC-041: Check for weak hashing algorithms"""
patterns = [
(r'hashlib\.md5\s*\(', "MD5"),
(r'hashlib\.sha1\s*\(', "SHA1"),
(r'MD5\.new\s*\(', "MD5"),
(r'SHA\.new\s*\(', "SHA1"),
]
for i, line in enumerate(lines, 1):
for pattern, algo in patterns:
if re.search(pattern, line):
if "# noqa" in line or "# checksum" in line or "# file hash" in line:
continue
self._add_violation(
rule_id="SEC-041",
rule_name="Strong hashing algorithms",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message=f"{algo} is cryptographically weak",
context=line.strip()[:80],
suggestion="Use SHA-256 or stronger for security purposes",
)
def _check_insecure_random(self, file_path: Path, content: str, lines: list[str]):
"""SEC-042: Check for insecure random number generation"""
# Only check if file appears to deal with security
security_context = any(
word in content.lower()
for word in ["token", "secret", "key", "session", "csrf", "nonce", "salt"]
)
if not security_context:
return
patterns = [
r'random\.random\s*\(',
r'random\.randint\s*\(',
r'random\.choice\s*\(',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line):
if "# noqa" in line or "# not security" in line:
continue
self._add_violation(
rule_id="SEC-042",
rule_name="Secure random generation",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="random module is not cryptographically secure",
context=line.strip()[:80],
suggestion="Use secrets module for security-sensitive randomness",
)
def _check_hardcoded_keys(self, file_path: Path, content: str, lines: list[str]):
"""SEC-043: Check for hardcoded encryption keys"""
patterns = [
r'ENCRYPTION_KEY\s*=\s*["\'][^"\']+["\']',
r'SECRET_KEY\s*=\s*["\'][A-Za-z0-9+/=]{16,}["\']',
r'AES_KEY\s*=\s*["\']',
r'PRIVATE_KEY\s*=\s*["\']-----BEGIN',
]
exclude = ["os.getenv", "os.environ", "settings.", '""', "# test"]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line):
if any(exc in line for exc in exclude):
continue
self._add_violation(
rule_id="SEC-043",
rule_name="No hardcoded encryption keys",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message="Hardcoded encryption key found",
context=line.strip()[:50] + "...",
suggestion="Use environment variables for encryption keys",
)
def _check_certificate_verification(self, file_path: Path, content: str, lines: list[str]):
"""SEC-047: Check for disabled certificate verification"""
patterns = [
(r'verify\s*=\s*False', "SSL verification disabled"),
(r'CERT_NONE', "Certificate verification disabled"),
(r'check_hostname\s*=\s*False', "Hostname verification disabled"),
]
for i, line in enumerate(lines, 1):
for pattern, issue in patterns:
if re.search(pattern, line):
if "# noqa" in line or "# test" in line or "DEBUG" in line:
continue
self._add_violation(
rule_id="SEC-047",
rule_name="Certificate verification",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message=f"{issue} - vulnerable to MITM attacks",
context=line.strip()[:80],
suggestion="Always verify SSL certificates in production",
)
def _check_jwt_expiry(self, file_path: Path, content: str, lines: list[str]):
"""SEC-002: Check for JWT tokens without expiry"""
if "jwt.encode" in content and "exp" not in content:
# Find the jwt.encode line
for i, line in enumerate(lines, 1):
if "jwt.encode" in line:
self._add_violation(
rule_id="SEC-002",
rule_name="JWT expiry enforcement",
severity=Severity.WARNING,
file_path=file_path,
line_number=i,
message="JWT token may not have expiration claim",
context=line.strip()[:80],
suggestion="Include 'exp' claim with appropriate expiration",
)
break
def _check_sensitive_url_params_js(self, file_path: Path, content: str, lines: list[str]):
"""SEC-022: Check for sensitive data in URLs (JavaScript)"""
patterns = [
r'\?password=',
r'&password=',
r'\?token=(?!type)',
r'&token=(?!type)',
r'\?api_key=',
r'&api_key=',
]
for i, line in enumerate(lines, 1):
for pattern in patterns:
if re.search(pattern, line):
self._add_violation(
rule_id="SEC-022",
rule_name="Sensitive data in URLs",
severity=Severity.ERROR,
file_path=file_path,
line_number=i,
message="Sensitive data in URL query parameters",
context=line.strip()[:80],
suggestion="Send sensitive data in request body or headers",
)
def main():
parser = argparse.ArgumentParser(
description="Security code validator",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument("-f", "--file", type=Path, help="Validate a single file")
parser.add_argument("-d", "--folder", type=Path, help="Validate a directory")
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
parser.add_argument("--errors-only", action="store_true", help="Only show errors")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
validator = SecurityValidator(verbose=args.verbose)
if args.file:
validator.validate_file(args.file)
elif args.folder:
validator.validate_all(args.folder)
else:
validator.validate_all()
validator.output_results(json_output=args.json, errors_only=args.errors_only)
sys.exit(validator.get_exit_code())
if __name__ == "__main__":
main()