#!/usr/bin/env python3 """ Security Validator ================== Validates code against security rules defined in .security-rules/ This script checks for common security vulnerabilities: - Hardcoded credentials and secrets - SQL injection patterns - Command injection risks - XSS vulnerabilities - Insecure cryptography - Authentication weaknesses - Data exposure risks Usage: python scripts/validate/validate_security.py # Check all files python scripts/validate/validate_security.py -d app/api/ # Check specific directory python scripts/validate/validate_security.py -f app/api/v1/auth.py # Check single file python scripts/validate/validate_security.py -v # Verbose output python scripts/validate/validate_security.py --json # JSON output python scripts/validate/validate_security.py --errors-only # Only show errors Options: -f, --file PATH Validate a single file -d, --folder PATH Validate all files in a directory (recursive) -v, --verbose Show detailed output including context --errors-only Only show errors, suppress warnings and info --json Output results as JSON """ import argparse import re import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) from base_validator import BaseValidator, Severity, ValidationResult class SecurityValidator(BaseValidator): """Security-focused code validator""" VALIDATOR_NAME = "Security Validator" VALIDATOR_EMOJI = "🔒" RULES_DIR_NAME = ".security-rules" CONFIG_FILE_NAME = ".security-rules.yaml" def validate_all(self, target_path: Path = None) -> ValidationResult: """Validate all files for security issues""" print(f"\n{self.VALIDATOR_EMOJI} Starting security validation...\n") target = target_path or self.project_root # Validate Python files self._validate_python_files(target) # Validate JavaScript files self._validate_javascript_files(target) # Validate HTML templates self._validate_template_files(target) # Validate configuration files self._validate_config_files(target) return self.result def _validate_python_files(self, target: Path): """Validate all Python files for security issues""" print("🐍 Validating Python files...") for py_file in target.rglob("*.py"): if self._should_ignore_file(py_file): continue self.result.files_checked += 1 content = py_file.read_text() lines = content.split("\n") self._validate_python_security(py_file, content, lines) def _validate_javascript_files(self, target: Path): """Validate all JavaScript files for security issues""" print("🟨 Validating JavaScript files...") for js_file in target.rglob("*.js"): if self._should_ignore_file(js_file): continue self.result.files_checked += 1 content = js_file.read_text() lines = content.split("\n") self._validate_javascript_security(js_file, content, lines) def _validate_template_files(self, target: Path): """Validate all HTML template files for security issues""" print("📄 Validating template files...") for html_file in target.rglob("*.html"): if self._should_ignore_file(html_file): continue self.result.files_checked += 1 content = html_file.read_text() lines = content.split("\n") self._validate_template_security(html_file, content, lines) def _validate_config_files(self, target: Path): """Validate configuration files for security issues""" print("⚙️ Validating configuration files...") config_patterns = ["*.yaml", "*.yml", "*.json", "*.toml", "*.ini", "*.env*"] for pattern in config_patterns: for config_file in target.rglob(pattern): if self._should_ignore_file(config_file): continue if config_file.suffix in [".yaml", ".yml", ".json"]: self.result.files_checked += 1 content = config_file.read_text() lines = content.split("\n") self._validate_config_security(config_file, content, lines) def _validate_file_content(self, file_path: Path, content: str, lines: list[str]): """Validate file content based on file type""" if file_path.suffix == ".py": self._validate_python_security(file_path, content, lines) elif file_path.suffix == ".js": self._validate_javascript_security(file_path, content, lines) elif file_path.suffix == ".html": self._validate_template_security(file_path, content, lines) elif file_path.suffix in [".yaml", ".yml", ".json"]: self._validate_config_security(file_path, content, lines) def _validate_python_security(self, file_path: Path, content: str, lines: list[str]): """Validate Python file for security issues""" file_path_str = str(file_path) # SEC-001: Hardcoded credentials self._check_hardcoded_credentials(file_path, content, lines) # SEC-011: SQL injection self._check_sql_injection(file_path, content, lines) # SEC-012: Command injection self._check_command_injection(file_path, content, lines) # SEC-013: Code execution self._check_code_execution(file_path, content, lines) # SEC-014: Path traversal if "upload" in file_path_str.lower() or "file" in file_path_str.lower(): self._check_path_traversal(file_path, content, lines) # SEC-020: Unsafe deserialization self._check_unsafe_deserialization(file_path, content, lines) # SEC-021: PII logging self._check_pii_logging(file_path, content, lines) # SEC-024: Error information leakage self._check_error_leakage(file_path, content, lines) # SEC-034: HTTPS enforcement self._check_https_enforcement(file_path, content, lines) # SEC-040: Timeout configuration self._check_timeout_configuration(file_path, content, lines) # SEC-041: Weak hashing self._check_weak_hashing(file_path, content, lines) # SEC-042: Insecure random self._check_insecure_random(file_path, content, lines) # SEC-043: Hardcoded encryption keys self._check_hardcoded_keys(file_path, content, lines) # SEC-047: Certificate verification self._check_certificate_verification(file_path, content, lines) # Auth file specific checks if "auth" in file_path_str.lower(): self._check_jwt_expiry(file_path, content, lines) def _validate_javascript_security(self, file_path: Path, content: str, lines: list[str]): """Validate JavaScript file for security issues""" # SEC-022: Sensitive data in URLs self._check_sensitive_url_params_js(file_path, content, lines) # Check for eval usage for i, line in enumerate(lines, 1): if re.search(r"\beval\s*\(", line) and "//" not in line.split("eval")[0]: if self._is_noqa_suppressed(line, "SEC-013"): continue self._add_violation( rule_id="SEC-013", rule_name="No code execution", severity=Severity.ERROR, file_path=file_path, line_number=i, message="eval() allows arbitrary code execution", context=line.strip()[:80], suggestion="Use JSON.parse() for JSON or other safe alternatives", ) # Check for innerHTML with user input for i, line in enumerate(lines, 1): if re.search(r"\.innerHTML\s*=", line) and "//" not in line.split("innerHTML")[0]: if self._is_noqa_suppressed(line, "SEC-015"): continue self._add_violation( rule_id="SEC-015", rule_name="XSS prevention", severity=Severity.WARNING, file_path=file_path, line_number=i, message="innerHTML can lead to XSS if used with untrusted input", context=line.strip()[:80], suggestion="Use textContent for text or sanitize HTML input", ) def _validate_template_security(self, file_path: Path, content: str, lines: list[str]): """Validate HTML template file for security issues""" # SEC-015: XSS via |safe filter for i, line in enumerate(lines, 1): if re.search(r"\|\s*safe", line) and "sanitized" not in line.lower(): if self._is_noqa_suppressed(line, "SEC-015"): continue self._add_violation( rule_id="SEC-015", rule_name="XSS prevention in templates", severity=Severity.WARNING, file_path=file_path, line_number=i, message="|safe filter disables auto-escaping - ensure content is sanitized", context=line.strip()[:80], suggestion="Mark with {# sanitized #} comment if content is sanitized", ) # Check for x-html with dynamic content for i, line in enumerate(lines, 1): if re.search(r'x-html="[^"]*\w', line) and "sanitized" not in line.lower(): if self._is_noqa_suppressed(line, "SEC-015"): continue # Skip safe Alpine.js patterns — static SVG icons and internal JS methods if re.search(r'x-html="[^"]*\$icon\(', line): continue if re.search(r'x-html="[^"]*\$store\.\w+\.\w+', line): continue if re.search(r'x-html="[^"]*window\.icons', line): continue self._add_violation( rule_id="SEC-015", rule_name="XSS prevention in templates", severity=Severity.INFO, file_path=file_path, line_number=i, message="x-html renders raw HTML - ensure content is safe", context=line.strip()[:80], suggestion="Use x-text for text content or sanitize HTML", ) def _validate_config_security(self, file_path: Path, content: str, lines: list[str]): """Validate configuration file for security issues""" # Check for hardcoded secrets in config secret_patterns = [ (r'password\s*[=:]\s*["\'][^"\']{4,}["\']', "password"), (r'secret\s*[=:]\s*["\'][^"\']{8,}["\']', "secret"), (r'api_key\s*[=:]\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"), (r'token\s*[=:]\s*["\'][A-Za-z0-9._-]{20,}["\']', "token"), ] for i, line in enumerate(lines, 1): # Skip comments stripped = line.strip() if stripped.startswith(("#", "//")): continue for pattern, secret_type in secret_patterns: if re.search(pattern, line, re.IGNORECASE): # Check for environment variable references if "${" in line or "os.getenv" in line or "environ" in line: continue if self._is_noqa_suppressed(line, "SEC-001"): continue self._add_violation( rule_id="SEC-001", rule_name="No hardcoded credentials", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible hardcoded {secret_type} in configuration", context=line.strip()[:60] + "...", suggestion="Use environment variables for secrets", ) # ========================================================================= # Specific Security Checks # ========================================================================= def _check_hardcoded_credentials(self, file_path: Path, content: str, lines: list[str]): """SEC-001: Check for hardcoded credentials""" patterns = [ (r'password\s*=\s*["\'][^"\']{4,}["\']', "password"), (r'api_key\s*=\s*["\'][A-Za-z0-9_-]{16,}["\']', "API key"), (r'secret_key\s*=\s*["\'][^"\']{8,}["\']', "secret key"), (r'auth_token\s*=\s*["\'][A-Za-z0-9._-]{20,}["\']', "auth token"), (r'AWS_SECRET.*=\s*["\'][^"\']+["\']', "AWS secret"), (r'STRIPE_.*KEY.*=\s*["\'][^"\']+["\']', "Stripe key"), ] exclude_patterns = [ "os.getenv", "os.environ", "settings.", '""', "''", "# test", "password_hash", "example" ] for i, line in enumerate(lines, 1): for pattern, secret_type in patterns: if re.search(pattern, line, re.IGNORECASE): # Check exclusions if any(exc in line for exc in exclude_patterns): continue if self._is_noqa_suppressed(line, "SEC-001"): continue self._add_violation( rule_id="SEC-001", rule_name="No hardcoded credentials", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible hardcoded {secret_type}", context=line.strip()[:60] + "...", suggestion="Use environment variables or secret management", ) def _check_sql_injection(self, file_path: Path, content: str, lines: list[str]): """SEC-011: Check for SQL injection vulnerabilities""" patterns = [ r'execute\s*\(\s*f["\']', r"execute\s*\([^)]*\s*\+\s*", r"execute\s*\([^)]*%[^)]*%", r'text\s*\(\s*f["\']', r'\.raw\s*\(\s*f["\']', ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-011") or "# safe" in line: continue self._add_violation( rule_id="SEC-011", rule_name="No raw SQL queries", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Possible SQL injection - use parameterized queries", context=line.strip()[:80], suggestion="Use SQLAlchemy ORM or parameterized queries with :param syntax", ) def _check_command_injection(self, file_path: Path, content: str, lines: list[str]): """SEC-012: Check for command injection vulnerabilities""" patterns = [ (r"subprocess.*shell\s*=\s*True", "shell=True in subprocess"), # noqa: SEC012 (r"os\.system\s*\(", "os.system()"), # noqa: SEC012 (r"os\.popen\s*\(", "os.popen()"), # noqa: SEC012 ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-012") or "# safe" in line: continue self._add_violation( rule_id="SEC-012", rule_name="No shell command injection", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} allows command injection", context=line.strip()[:80], suggestion="Use subprocess with list arguments, shell=False", ) def _check_code_execution(self, file_path: Path, content: str, lines: list[str]): """SEC-013: Check for code execution vulnerabilities""" patterns = [ (r"eval\s*\([^)]*request", "eval with request data"), (r"eval\s*\([^)]*input", "eval with user input"), (r"exec\s*\([^)]*request", "exec with request data"), (r"__import__\s*\([^)]*request", "__import__ with request data"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line, re.IGNORECASE): if self._is_noqa_suppressed(line, "SEC-013"): continue self._add_violation( rule_id="SEC-013", rule_name="No code execution", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} allows arbitrary code execution", context=line.strip()[:80], suggestion="Never use eval/exec with user input", ) def _check_path_traversal(self, file_path: Path, content: str, lines: list[str]): """SEC-014: Check for path traversal vulnerabilities""" # Check if file has path operations with user input has_secure_filename = "secure_filename" in content or "basename" in content patterns = [ r"open\s*\([^)]*request", r"open\s*\([^)]*\+", r"Path\s*\([^)]*request", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): if has_secure_filename: continue if self._is_noqa_suppressed(line, "SEC-014"): continue self._add_violation( rule_id="SEC-014", rule_name="Path traversal prevention", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Possible path traversal - validate file paths", context=line.strip()[:80], suggestion="Use secure_filename() and validate paths against allowed directories", ) def _check_unsafe_deserialization(self, file_path: Path, content: str, lines: list[str]): """SEC-020: Check for unsafe deserialization""" patterns = [ (r"pickle\.loads?\s*\(", "pickle deserialization"), (r"yaml\.load\s*\([^,)]+\)(?!.*SafeLoader)", "yaml.load without SafeLoader"), (r"marshal\.loads?\s*\(", "marshal deserialization"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-020"): continue self._add_violation( rule_id="SEC-020", rule_name="Deserialization safety", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Unsafe {issue} can lead to code execution", context=line.strip()[:80], suggestion="Use json.loads() or yaml.safe_load() instead", ) def _check_pii_logging(self, file_path: Path, content: str, lines: list[str]): """SEC-021: Check for PII in logs""" patterns = [ (r"log\w*\.[a-z]+\([^)]*password", "password in log"), (r"log\w*\.[a-z]+\([^)]*credit_card", "credit card in log"), (r"log\w*\.[a-z]+\([^)]*ssn", "SSN in log"), (r"print\s*\([^)]*password", "password in print"), ] exclude = ["password_hash", "password_reset", "password_changed"] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line, re.IGNORECASE): if any(exc in line for exc in exclude): continue if self._is_noqa_suppressed(line, "SEC-021"): continue self._add_violation( rule_id="SEC-021", rule_name="PII logging prevention", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"Possible {issue}", context=line.strip()[:60] + "...", suggestion="Never log sensitive data - redact or omit", ) def _check_error_leakage(self, file_path: Path, content: str, lines: list[str]): """SEC-024: Check for error information leakage""" patterns = [ r"traceback\.format_exc\(\).*detail", r"traceback\.format_exc\(\).*response", r"str\(e\).*HTTPException", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if "logger" in line: continue if self._is_noqa_suppressed(line, "SEC-024"): continue self._add_violation( rule_id="SEC-024", rule_name="Error message information leakage", severity=Severity.WARNING, file_path=file_path, line_number=i, message="Internal error details may be exposed to users", context=line.strip()[:80], suggestion="Log errors internally, return generic message to users", ) def _check_https_enforcement(self, file_path: Path, content: str, lines: list[str]): """SEC-034: Check for HTTP instead of HTTPS""" for i, line in enumerate(lines, 1): if re.search(r"http://(?!localhost|127\.0\.0\.1|0\.0\.0\.0|\$)", line): # noqa: SEC034 if self._is_noqa_suppressed(line, "SEC-034") or "example.com" in line or "schemas" in line: continue if "http://www.w3.org" in line: continue self._add_violation( rule_id="SEC-034", rule_name="HTTPS enforcement", severity=Severity.WARNING, file_path=file_path, line_number=i, message="HTTP URL found - use HTTPS for security", context=line.strip()[:80], suggestion="Replace http:// with https://", # noqa: SEC034 ) def _check_timeout_configuration(self, file_path: Path, content: str, lines: list[str]): """SEC-040: Check for missing timeouts on external calls""" # Check for requests/httpx calls without timeout if "requests" in content or "httpx" in content or "aiohttp" in content: "timeout" in content.lower() patterns = [ r"requests\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)", r"httpx\.(get|post|put|delete|patch)\s*\([^)]+\)(?!.*timeout)", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line) and "timeout" not in line: if self._is_noqa_suppressed(line, "SEC-040"): continue self._add_violation( rule_id="SEC-040", rule_name="Timeout configuration", severity=Severity.WARNING, file_path=file_path, line_number=i, message="HTTP request without timeout - can hang indefinitely", context=line.strip()[:80], suggestion="Add timeout parameter: requests.get(url, timeout=30)", ) def _check_weak_hashing(self, file_path: Path, content: str, lines: list[str]): """SEC-041: Check for weak hashing algorithms""" patterns = [ (r"hashlib\.md5\s*\(", "MD5"), (r"hashlib\.sha1\s*\(", "SHA1"), (r"MD5\.new\s*\(", "MD5"), (r"SHA\.new\s*\(", "SHA1"), ] for i, line in enumerate(lines, 1): for pattern, algo in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-041") or "# checksum" in line or "# file hash" in line: continue self._add_violation( rule_id="SEC-041", rule_name="Strong hashing algorithms", severity=Severity.WARNING, file_path=file_path, line_number=i, message=f"{algo} is cryptographically weak", context=line.strip()[:80], suggestion="Use SHA-256 or stronger for security purposes", ) def _check_insecure_random(self, file_path: Path, content: str, lines: list[str]): """SEC-042: Check for insecure random number generation""" # Only check if file appears to deal with security security_context = any( word in content.lower() for word in ["token", "secret", "key", "session", "csrf", "nonce", "salt"] ) if not security_context: return patterns = [ r"random\.random\s*\(", r"random\.randint\s*\(", r"random\.choice\s*\(", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-042") or "# not security" in line: continue self._add_violation( rule_id="SEC-042", rule_name="Secure random generation", severity=Severity.WARNING, file_path=file_path, line_number=i, message="random module is not cryptographically secure", context=line.strip()[:80], suggestion="Use secrets module for security-sensitive randomness", ) def _check_hardcoded_keys(self, file_path: Path, content: str, lines: list[str]): """SEC-043: Check for hardcoded encryption keys""" patterns = [ r'ENCRYPTION_KEY\s*=\s*["\'][^"\']+["\']', r'SECRET_KEY\s*=\s*["\'][A-Za-z0-9+/=]{16,}["\']', r'AES_KEY\s*=\s*["\']', r'PRIVATE_KEY\s*=\s*["\']-----BEGIN', ] exclude = ["os.getenv", "os.environ", "settings.", '""', "# test"] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if any(exc in line for exc in exclude): continue if self._is_noqa_suppressed(line, "SEC-043"): continue self._add_violation( rule_id="SEC-043", rule_name="No hardcoded encryption keys", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Hardcoded encryption key found", context=line.strip()[:50] + "...", suggestion="Use environment variables for encryption keys", ) def _check_certificate_verification(self, file_path: Path, content: str, lines: list[str]): """SEC-047: Check for disabled certificate verification""" patterns = [ (r"verify\s*=\s*False", "SSL verification disabled"), (r"CERT_NONE", "Certificate verification disabled"), # noqa: SEC047 (r"check_hostname\s*=\s*False", "Hostname verification disabled"), ] for i, line in enumerate(lines, 1): for pattern, issue in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-047") or "# test" in line or "DEBUG" in line: continue self._add_violation( rule_id="SEC-047", rule_name="Certificate verification", severity=Severity.ERROR, file_path=file_path, line_number=i, message=f"{issue} - vulnerable to MITM attacks", context=line.strip()[:80], suggestion="Always verify SSL certificates in production", ) def _check_jwt_expiry(self, file_path: Path, content: str, lines: list[str]): """SEC-002: Check for JWT tokens without expiry""" if "jwt.encode" in content and "exp" not in content: # Find the jwt.encode line for i, line in enumerate(lines, 1): if "jwt.encode" in line: if self._is_noqa_suppressed(line, "SEC-002"): continue self._add_violation( rule_id="SEC-002", rule_name="JWT expiry enforcement", severity=Severity.WARNING, file_path=file_path, line_number=i, message="JWT token may not have expiration claim", context=line.strip()[:80], suggestion="Include 'exp' claim with appropriate expiration", ) break def _check_sensitive_url_params_js(self, file_path: Path, content: str, lines: list[str]): """SEC-022: Check for sensitive data in URLs (JavaScript)""" patterns = [ r"\?password=", r"&password=", r"\?token=(?!type)", r"&token=(?!type)", r"\?api_key=", r"&api_key=", ] for i, line in enumerate(lines, 1): for pattern in patterns: if re.search(pattern, line): if self._is_noqa_suppressed(line, "SEC-022"): continue self._add_violation( rule_id="SEC-022", rule_name="Sensitive data in URLs", severity=Severity.ERROR, file_path=file_path, line_number=i, message="Sensitive data in URL query parameters", context=line.strip()[:80], suggestion="Send sensitive data in request body or headers", ) def main(): parser = argparse.ArgumentParser( description="Security code validator", formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument("-f", "--file", type=Path, help="Validate a single file") parser.add_argument("-d", "--folder", type=Path, help="Validate a directory") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument("--errors-only", action="store_true", help="Only show errors") parser.add_argument("--json", action="store_true", help="JSON output") args = parser.parse_args() validator = SecurityValidator(verbose=args.verbose) if args.file: validator.validate_file(args.file) elif args.folder: validator.validate_all(args.folder) else: validator.validate_all() validator.output_results(json_output=args.json, errors_only=args.errors_only) sys.exit(validator.get_exit_code()) if __name__ == "__main__": main()