#!/usr/bin/env python3 """ Security Validator ================== Validates code against security rules defined in .security-rules/ This script checks for common security vulnerabilities: - Hardcoded credentials and secrets - SQL injection patterns - Command injection risks - XSS vulnerabilities - Insecure cryptography - Authentication weaknesses - Data exposure risks Usage: python scripts/validate/validate_security.py # Check all files python scripts/validate/validate_security.py -d app/api/ # Check specific directory python scripts/validate/validate_security.py -f app/api/v1/auth.py # Check single file python scripts/validate/validate_security.py -v # Verbose output python scripts/validate/validate_security.py --json # JSON output python scripts/validate/validate_security.py --errors-only # Only show errors Options: -f, --file PATH Validate a single file -d, --folder PATH Validate all files in a directory (recursive) -v, --verbose Show detailed output including context --errors-only Only show errors, suppress warnings and info --json Output results as JSON """ import argparse import re import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) from base_validator import BaseValidator, Severity, ValidationResult class SecurityValidator(BaseValidator): """Security-focused code validator""" VALIDATOR_NAME = "Security Validator" VALIDATOR_EMOJI = "🔒" RULES_DIR_NAME = ".security-rules" CONFIG_FILE_NAME = ".security-rules.yaml" def validate_all(self, target_path: Path = None) -> ValidationResult: """Validate all files for security issues""" print(f"\n{self.VALIDATOR_EMOJI} Starting security validation...\n") target = target_path or self.project_root # Validate Python files self._validate_python_files(target) # Validate JavaScript files self._validate_javascript_files(target) # Validate HTML templates self._validate_template_files(target) # Validate configuration files self._validate_config_files(target) return self.result def _validate_python_files(self, target: Path): """Validate all Python files for security issues""" print("🐍 Validating Python files...") for py_file in target.rglob("*.py"): if self._should_ignore_file(py_file): continue self.result.files_checked += 1 content = py_file.read_text() lines = content.split("\n") self._validate_python_security(py_file, content, lines) def _validate_javascript_files(self, target: Path): """Validate all JavaScript files for security issues""" print("🟨 Validating JavaScript files...") for js_file in target.rglob("*.js"): if self._should_ignore_file(js_file): continue self.result.files_checked += 1 content = js_file.read_text() lines = content.split("\n") self._validate_javascript_security(js_file, content, lines) def _validate_template_files(self, target: Path): """Validate all HTML template files for security issues""" print("📄 Validating template files...") for html_file in target.rglob("*.html"): if self._should_ignore_file(html_file): continue self.result.files_checked += 1 content = html_file.read_text() lines = content.split("\n") self._validate_template_security(html_file, content, lines) def _validate_config_files(self, target: Path): """Validate configuration files for security issues""" print("⚙️ Validating configuration files...") config_patterns = ["*.yaml", "*.yml", "*.json", "*.toml", "*.ini", "*.env*"] for pattern in config_patterns: for config_file in target.rglob(pattern): if self._should_ignore_file(config_file): continue if config_file.suffix in [".yaml", ".yml", ".json"]: self.result.files_checked += 1 content = config_file.read_text() lines = content.split("\n") self._validate_config_security(config_file, content, lines) def _validate_file_content(self, file_path: Path, content: str, lines: list[str]): """Validate file content based on file type""" if file_path.suffix == ".py": self._validate_python_security(file_path, content, lines) elif file_path.suffix == ".js": self._validate_javascript_security(file_path, content, lines) elif file_path.suffix == ".html": self._validate_template_security(file_path, content, lines) elif file_path.suffix in [".yaml", ".yml", ".json"]: self._validate_config_security(file_path, content, lines) def _validate_python_security(self, file_path: Path, content: str, lines: list[str]): """Validate Python file for security issues""" file_path_str = str(file_path) # SEC-001: Hardcoded credentials self._check_hardcoded_credentials(file_path, content, lines) # SEC-011: SQL injection self._check_sql_injection(file_path, content, lines) # SEC-012: Command injection self._check_command_injection(file_path, content, lines) # SEC-013: Code execution self._check_code_execution(file_path, content, lines) # SEC-014: Path traversal if "upload" in file_path_str.lower() or "file" in file_path_str.lower(): self._check_path_traversal(file_path, content, lines) # SEC-020: Unsafe deserialization self._check_unsafe_deserialization(file_path, content, lines) # SEC-021: PII logging self._check_pii_logging(file_path, content, lines) # SEC-024: Error information leakage self._check_error_leakage(file_path, content, lines) # SEC-034: HTTPS enforcement self._check_https_enforcement(file_path, content, lines) # SEC-040: Timeout configuration self._check_timeout_configuration(file_path, content, lines) # SEC-041: Weak hashing self._check_weak_hashing(file_path, content, lines) # SEC-042: Insecure random self._check_insecure_random(file_path, content, lines) # SEC-043: Hardcoded encryption keys self._check_hardcoded_keys(file_path, content, lines) # SEC-047: Certificate verification self._check_certificate_verification(file_path, content, lines) # Auth file specific checks if "auth" in file_path_str.lower(): self._check_jwt_expiry(file_path, content, lines) def _validate_javascript_security(self, file_path: Path, content: str, lines: list[str]): """Validate JavaScript file for security issues""" # SEC-022: Sensitive data in URLs self._check_sensitive_url_params_js(file_path, content, lines) # Check for eval usage for i, line in enumerate(lines, 1): if re.search(r'\beval\s*\(', line) and "//" not in line.split("eval")[0]: self._add_violation( rule_id="SEC-013", rule_name="No code execution", severity=Severity.ERROR, file_path=file_path, line_number=i, message="eval() allows arbitrary code execution", context=line.strip()[:80], suggestion="Use JSON.parse() for JSON or other safe alternatives", ) # Check for innerHTML with user input for i, line in enumerate(lines, 1): if re.search(r'\.innerHTML\s*=', line) and "//" not in line.split("innerHTML")[0]: self._add_violation( rule_id="SEC-015", rule_name="XSS prevention", severity=Severity.WARNING, file_path=file_path, line_number=i, message="innerHTML can lead to XSS if used with untrusted input", context=line.strip()[:80], suggestion="Use textContent for text or sanitize HTML input", ) def _validate_template_security(self, file_path: Path, content: str, lines: list[str]): """Validate HTML template file for security issues""" # SEC-015: XSS via |safe filter for i, line in enumerate(lines, 1): if re.search(r'\|\s*safe', line) and 'sanitized' not in line.lower(): self._add_violation( rule_id="SEC-015", rule_name="XSS prevention in templates", severity=Severity.WARNING, file_path=file_path, line_number=i, message="|safe filter disables auto-escaping - ensure content is sanitized", context=line.strip()[:80], suggestion="Mark with {# sanitized #} comment if content is sanitized", ) # Check for x-html with dynamic content for i, line in enumerate(lines, 1): if re.search(r'x-html="[^"]*\w', line) and "sanitized" not in line.lower(): self._add_violation( rule_id="SEC-015", rule_name="XSS prevention in templates", severity=Severity.INFO, file_path=file_path, line_number=i, message="x-html renders raw HTML - ensure content is safe", context=line.strip()[:80], suggestion="Use x-text for text content or sanitize HTML", ) def _validate_config_security(self, file_path: Path, content: str, lines: list[str]): """Validate configuration file for security issues""" # Check for hardcoded secrets in config secret_patterns = [ (r'password\s*[=:]\s*["\'][^"\']{4,}["\']', "password"), (r'secret\s*[=:]\s*["\'][^"\']{8,}["\']', "secret"), (r'api_key\s*[=:]\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"), (r'token\s*[=:]\s*["\'][A-Za-z0-9._-]{20,}["\']', "token"), ] for i, line in enumerate(lines, 1): # Skip comments stripped = line.strip() if stripped.startswith("#") or stripped.startswith("//"): continue for pattern, secret_type in secret_patterns: if re.search(pattern, line, re.IGNORECASE): # Check for environment variable references if "${" in line or "os.getenv" in line or "environ" in line: continue self._add_violation( rule_id="SEC-001", rule_name="No hardcoded credentials", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible hardcoded {secret_type} in configuration", context=line.strip()[:60] + "...", suggestion="Use environment variables for secrets", ) # ========================================================================= # Specific Security Checks # ========================================================================= def _check_hardcoded_credentials(self, file_path: Path, content: str, lines: list[str]): """SEC-001: Check for hardcoded credentials""" patterns = [ (r'password\s*=\s*["\'][^"\']{4,}["\']', "password"), (r'api_key\s*=\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"), (r'secret_key\s*=\s*["\'][^"\']{8,}["\']', "secret key"), (r'auth_token\s*=\s*["\'][A-Za-z0-9._-]{20,}["\']', "auth token"), (r'AWS_SECRET.*=\s*["\'][^"\']+["\']', "AWS secret"), (r'STRIPE_.*KEY.*=\s*["\'][^"\']+["\']', "Stripe key"), ] exclude_patterns = [ "os.getenv", "os.environ", "settings.", '""', "''", "# noqa", "# test", "password_hash", "example" ] for i, line in enumerate(lines, 1): for pattern, secret_type in patterns: if re.search(pattern, line, re.IGNORECASE): # Check exclusions if any(exc in line for exc in exclude_patterns): continue self._add_violation( rule_id="SEC-001", rule_name="No hardcoded credentials", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible hardcoded {secret_type}", context=line.strip()[:60] + "...", suggestion="Use environment variables or secret management", ) def _check_sql_injection(self, file_path: Path, content: str, lines: list[str]): """SEC-011: Check for SQL injection vulnerabilities""" patterns = [ r'execute\s*\(\s*f["\']', r'execute\s*\([^)]*\s*\+\s*', r'execute\s*\([^)]*%[^)]*%', r'text\s*\(\s*f["\']', r'\.raw\s*\(\s*f["\']', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if "# noqa" in line or "# safe" in line: continue self._add_violation( rule_id="SEC-011", rule_name="No raw SQL queries", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Possible SQL injection - use parameterized queries", context=line.strip()[:80], suggestion="Use SQLAlchemy ORM or parameterized queries with :param syntax", ) def _check_command_injection(self, file_path: Path, content: str, lines: list[str]): """SEC-012: Check for command injection vulnerabilities""" patterns = [ (r'subprocess.*shell\s*=\s*True', "shell=True in subprocess"), (r'os\.system\s*\(', "os.system()"), (r'os\.popen\s*\(', "os.popen()"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if "# noqa" in line or "# safe" in line: continue self._add_violation( rule_id="SEC-012", rule_name="No shell command injection", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} allows command injection", context=line.strip()[:80], suggestion="Use subprocess with list arguments, shell=False", ) def _check_code_execution(self, file_path: Path, content: str, lines: list[str]): """SEC-013: Check for code execution vulnerabilities""" patterns = [ (r'eval\s*\([^)]*request', "eval with request data"), (r'eval\s*\([^)]*input', "eval with user input"), (r'exec\s*\([^)]*request', "exec with request data"), (r'__import__\s*\([^)]*request', "__import__ with request data"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line, re.IGNORECASE): self._add_violation( rule_id="SEC-013", rule_name="No code execution", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} allows arbitrary code execution", context=line.strip()[:80], suggestion="Never use eval/exec with user input", ) def _check_path_traversal(self, file_path: Path, content: str, lines: list[str]): """SEC-014: Check for path traversal vulnerabilities""" # Check if file has path operations with user input has_secure_filename = "secure_filename" in content or "basename" in content patterns = [ r'open\s*\([^)]*request', r'open\s*\([^)]*\+', r'Path\s*\([^)]*request', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): if has_secure_filename: continue self._add_violation( rule_id="SEC-014", rule_name="Path traversal prevention", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Possible path traversal - validate file paths", context=line.strip()[:80], suggestion="Use secure_filename() and validate paths against allowed directories", ) def _check_unsafe_deserialization(self, file_path: Path, content: str, lines: list[str]): """SEC-020: Check for unsafe deserialization""" patterns = [ (r'pickle\.loads?\s*\(', "pickle deserialization"), (r'yaml\.load\s*\([^,)]+\)(?!.*SafeLoader)', "yaml.load without SafeLoader"), (r'marshal\.loads?\s*\(', "marshal deserialization"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if "# noqa" in line: continue self._add_violation( rule_id="SEC-020", rule_name="Deserialization safety", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Unsafe {issue} can lead to code execution", context=line.strip()[:80], suggestion="Use json.loads() or yaml.safe_load() instead", ) def _check_pii_logging(self, file_path: Path, content: str, lines: list[str]): """SEC-021: Check for PII in logs""" patterns = [ (r'log\w*\.[a-z]+\([^)]*password', "password in log"), (r'log\w*\.[a-z]+\([^)]*credit_card', "credit card in log"), (r'log\w*\.[a-z]+\([^)]*ssn', "SSN in log"), (r'print\s*\([^)]*password', "password in print"), ] exclude = ["password_hash", "password_reset", "password_changed", "# noqa"] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line, re.IGNORECASE): if any(exc in line for exc in exclude): continue self._add_violation( rule_id="SEC-021", rule_name="PII logging prevention", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible {issue}", context=line.strip()[:60] + "...", suggestion="Never log sensitive data - redact or omit", ) def _check_error_leakage(self, file_path: Path, content: str, lines: list[str]): """SEC-024: Check for error information leakage""" patterns = [ r'traceback\.format_exc\(\).*detail', r'traceback\.format_exc\(\).*response', r'str\(e\).*HTTPException', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if "logger" in line or "# noqa" in line: continue self._add_violation( rule_id="SEC-024", rule_name="Error message information leakage", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Internal error details may be exposed to users", context=line.strip()[:80], suggestion="Log errors internally, return generic message to users", ) def _check_https_enforcement(self, file_path: Path, content: str, lines: list[str]): """SEC-034: Check for HTTP instead of HTTPS""" for i, line in enumerate(lines, 1): if re.search(r'http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0|\$)', line): if "# noqa" in line or "example.com" in line or "schemas" in line: continue if "http://www.w3.org" in line: continue self._add_violation( rule_id="SEC-034", rule_name="HTTPS enforcement", severity=Severity.WARNING, file_path=file_path, line_number=i, message="HTTP URL found - use HTTPS for security", context=line.strip()[:80], suggestion="Replace http:// with https://", ) def _check_timeout_configuration(self, file_path: Path, content: str, lines: list[str]): """SEC-040: Check for missing timeouts on external calls""" # Check for requests/httpx calls without timeout if "requests" in content or "httpx" in content or "aiohttp" in content: has_timeout_import = "timeout" in content.lower() patterns = [ r'requests\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)', r'httpx\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line) and "timeout" not in line: self._add_violation( rule_id="SEC-040", rule_name="Timeout configuration", severity=Severity.WARNING, file_path=file_path, line_number=i, message="HTTP request without timeout - can hang indefinitely", context=line.strip()[:80], suggestion="Add timeout parameter: requests.get(url, timeout=30)", ) def _check_weak_hashing(self, file_path: Path, content: str, lines: list[str]): """SEC-041: Check for weak hashing algorithms""" patterns = [ (r'hashlib\.md5\s*\(', "MD5"), (r'hashlib\.sha1\s*\(', "SHA1"), (r'MD5\.new\s*\(', "MD5"), (r'SHA\.new\s*\(', "SHA1"), ] for i, line in enumerate(lines, 1): for pattern, algo in patterns: if re.search(pattern, line): if "# noqa" in line or "# checksum" in line or "# file hash" in line: continue self._add_violation( rule_id="SEC-041", rule_name="Strong hashing algorithms", severity=Severity.WARNING, file_path=file_path, line_number=i, message=f"{algo} is cryptographically weak", context=line.strip()[:80], suggestion="Use SHA-256 or stronger for security purposes", ) def _check_insecure_random(self, file_path: Path, content: str, lines: list[str]): """SEC-042: Check for insecure random number generation""" # Only check if file appears to deal with security security_context = any( word in content.lower() for word in ["token", "secret", "key", "session", "csrf", "nonce", "salt"] ) if not security_context: return patterns = [ r'random\.random\s*\(', r'random\.randint\s*\(', r'random\.choice\s*\(', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if "# noqa" in line or "# not security" in line: continue self._add_violation( rule_id="SEC-042", rule_name="Secure random generation", severity=Severity.WARNING, file_path=file_path, line_number=i, message="random module is not cryptographically secure", context=line.strip()[:80], suggestion="Use secrets module for security-sensitive randomness", ) def _check_hardcoded_keys(self, file_path: Path, content: str, lines: list[str]): """SEC-043: Check for hardcoded encryption keys""" patterns = [ r'ENCRYPTION_KEY\s*=\s*["\'][^"\']+["\']', r'SECRET_KEY\s*=\s*["\'][A-Za-z0-9+/=]{16,}["\']', r'AES_KEY\s*=\s*["\']', r'PRIVATE_KEY\s*=\s*["\']-----BEGIN', ] exclude = ["os.getenv", "os.environ", "settings.", '""', "# test"] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if any(exc in line for exc in exclude): continue self._add_violation( rule_id="SEC-043", rule_name="No hardcoded encryption keys", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Hardcoded encryption key found", context=line.strip()[:50] + "...", suggestion="Use environment variables for encryption keys", ) def _check_certificate_verification(self, file_path: Path, content: str, lines: list[str]): """SEC-047: Check for disabled certificate verification""" patterns = [ (r'verify\s*=\s*False', "SSL verification disabled"), (r'CERT_NONE', "Certificate verification disabled"), (r'check_hostname\s*=\s*False', "Hostname verification disabled"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if "# noqa" in line or "# test" in line or "DEBUG" in line: continue self._add_violation( rule_id="SEC-047", rule_name="Certificate verification", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} - vulnerable to MITM attacks", context=line.strip()[:80], suggestion="Always verify SSL certificates in production", ) def _check_jwt_expiry(self, file_path: Path, content: str, lines: list[str]): """SEC-002: Check for JWT tokens without expiry""" if "jwt.encode" in content and "exp" not in content: # Find the jwt.encode line for i, line in enumerate(lines, 1): if "jwt.encode" in line: self._add_violation( rule_id="SEC-002", rule_name="JWT expiry enforcement", severity=Severity.WARNING, file_path=file_path, line_number=i, message="JWT token may not have expiration claim", context=line.strip()[:80], suggestion="Include 'exp' claim with appropriate expiration", ) break def _check_sensitive_url_params_js(self, file_path: Path, content: str, lines: list[str]): """SEC-022: Check for sensitive data in URLs (JavaScript)""" patterns = [ r'\?password=', r'&password=', r'\?token=(?!type)', r'&token=(?!type)', r'\?api_key=', r'&api_key=', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): self._add_violation( rule_id="SEC-022", rule_name="Sensitive data in URLs", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Sensitive data in URL query parameters", context=line.strip()[:80], suggestion="Send sensitive data in request body or headers", ) def main(): parser = argparse.ArgumentParser( description="Security code validator", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("-f", "--file", type=Path, help="Validate a single file") parser.add_argument("-d", "--folder", type=Path, help="Validate a directory") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("--errors-only", action="store_true", help="Only show errors") parser.add_argument("--json", action="store_true", help="JSON output") args = parser.parse_args() validator = SecurityValidator(verbose=args.verbose) if args.file: validator.validate_file(args.file) elif args.folder: validator.validate_all(args.folder) else: validator.validate_all() validator.output_results(json_output=args.json, errors_only=args.errors_only) sys.exit(validator.get_exit_code()) if __name__ == "__main__": main()