#!/usr/bin/env python3 """ IT Internal Audit Validator Validates code against internal audit rules defined in .audit-rules/ Focuses on governance, compliance, and control requirements. """ import re import sys from pathlib import Path # Add parent directory to path for imports sys.path.insert(0, str(Path(__file__).parent)) from base_validator import BaseValidator class AuditValidator(BaseValidator): """Validates IT internal audit rules.""" def __init__(self, project_root: Path | None = None): super().__init__(".audit-rules", project_root) def validate(self) -> bool: """Run all audit validations.""" self._validate_audit_trail() self._validate_access_control() self._validate_data_governance() self._validate_compliance() self._validate_change_management() self._validate_third_party() self._validate_documentation() return len(self.errors) == 0 # ================== # AUDIT TRAIL # ================== def _validate_audit_trail(self) -> None: """Validate audit trail requirements.""" # Check authentication logging auth_files = [ self.project_root / "app" / "api" / "v1" / "auth.py", self.project_root / "app" / "routes" / "admin.py", ] for file in auth_files: if file.exists(): content = file.read_text() if "logger" not in content: self.add_error( "AUDIT-LOG-001", "Authentication operations must include logging", str(file), ) # Check service layer logging services_path = self.project_root / "app" / "services" if services_path.exists(): for file in services_path.glob("*.py"): if file.name == "__init__.py": continue content = file.read_text() # Services that modify data should have logging if re.search(r"def (create|update|delete)", content): if "logger" not in content: self.add_warning( "AUDIT-LOG-002", "Service with data modifications should include logging", str(file), ) # Check for audit timestamp fields in models # Models can have timestamps directly or inherit from BaseModel/TimestampMixin models_path = self.project_root / "models" / "database" if models_path.exists(): for file in models_path.glob("*.py"): # audit_log.py uses timestamp field instead of created_at/updated_at if file.name in ("__init__.py", "base.py", "audit_log.py"): continue content = file.read_text() if "class " in content: # Has model definition # Check if timestamps are present directly or via inheritance has_timestamps = ( "created_at" in content or "updated_at" in content or "BaseModel" in content # Inherits from BaseModel or "TimestampMixin" in content # Uses TimestampMixin ) if not has_timestamps: self.add_warning( "AUDIT-FIELD-001", "Database model should include audit timestamp fields", str(file), ) # Check for forbidden log modification patterns self._check_forbidden_patterns( paths=["app/**/*.py"], patterns=[ r"os\.remove.*\.log", r"truncate.*log", r"open.*\.log.*['\"]w['\"]", ], rule_id="AUDIT-INT-001", message="Application must not modify or delete log files", ) # ================== # ACCESS CONTROL # ================== def _validate_access_control(self) -> None: """Validate access control requirements.""" # Check API endpoints have authentication api_path = self.project_root / "app" / "api" / "v1" if api_path.exists(): for file in api_path.glob("*.py"): # Skip endpoints that are intentionally unauthenticated if file.name in ("__init__.py", "health.py", "metrics.py"): continue content = file.read_text() # Check for authentication dependency if "@router" in content: if not re.search( r"CurrentUser|Depends.*get_current_user|AdminUser", content ): # auth.py handles its own auth if file.name != "auth.py": self.add_warning( "ACCESS-AUTH-001", "API endpoint should require authentication", str(file), ) # Check admin routes verify admin role admin_route = self.project_root / "app" / "routes" / "admin.py" if admin_route.exists(): content = admin_route.read_text() if "is_admin" not in content and "admin_required" not in content: self.add_warning( "ACCESS-AUTH-002", "Admin routes should verify admin privileges", str(admin_route), ) # Check password hashing security_file = self.project_root / "app" / "core" / "security.py" if security_file.exists(): content = security_file.read_text() if not re.search(r"bcrypt|argon2|scrypt|pbkdf2", content, re.IGNORECASE): self.add_error( "ACCESS-ACCT-003", "Passwords must use approved hashing algorithms", str(security_file), ) # Check password not in API responses # Note: Only flag if a class with "Response" in name directly defines password_hash # Internal schemas (like UserInDB) are not flagged as they're not API responses schema_path = self.project_root / "models" / "schema" if schema_path.exists(): for file in schema_path.glob("*.py"): content = file.read_text() # Check for Response classes that directly define password_hash # Split by class definitions and check each class_blocks = re.split(r"(?=^class\s)", content, flags=re.MULTILINE) for block in class_blocks: # Check if this class is a Response class class_match = re.match(r"class\s+(\w*Response\w*)", block) if class_match: # Check if password_hash is defined in this class (not inherited) if "password_hash:" in block or "password_hash =" in block: if "exclude" not in block.lower(): self.add_error( "ACCESS-PRIV-002", f"Password hash must be excluded from {class_match.group(1)}", str(file), ) # ================== # DATA GOVERNANCE # ================== def _validate_data_governance(self) -> None: """Validate data governance requirements.""" # Check PII not logged # Note: Patterns detect actual password values, not descriptive usage like "Password reset" # We look for patterns that suggest password values are being logged: # - password= or password: followed by a variable # - %s or {} after password indicating interpolation of password value self._check_forbidden_patterns( paths=["app/**/*.py", "middleware/**/*.py"], patterns=[ r"logger\.\w+\(.*password\s*[=:]\s*['\"]?%", # password=%s r"logger\.\w+\(.*password\s*[=:]\s*\{", # password={var} r"logging\.\w+\(.*password\s*[=:]\s*['\"]?%", # password=%s r"print\(.*password\s*=", # print(password=xxx) # noqa: SEC021 r"logger.*credit.*card.*\d", # credit card with numbers r"logger.*\bssn\b.*\d", # SSN with numbers ], rule_id="DATA-PII-003", message="PII/sensitive data must not be logged", ) # Check input validation (Pydantic) # Check both legacy models/schema/ and module schemas locations schema_paths = [ self.project_root / "models" / "schema", self.project_root / "app" / "modules", ] has_validation = False for schema_path in schema_paths: if not schema_path.exists(): continue for file in schema_path.rglob("schemas/*.py"): content = file.read_text() if re.search(r"Field|validator|field_validator", content): has_validation = True break if has_validation: break # Also check legacy location if schema_path == self.project_root / "models" / "schema": for file in schema_path.glob("*.py"): content = file.read_text() if re.search(r"Field|validator|field_validator", content): has_validation = True break if has_validation: break if not has_validation: self.add_error( "DATA-INT-001", "Pydantic validation required for data integrity", str(self.project_root / "app" / "modules"), ) # Check user data access endpoint exists (GDPR) users_api = self.project_root / "app" / "api" / "v1" / "users.py" if users_api.exists(): content = users_api.read_text() if "/me" not in content and "current" not in content.lower(): self.add_warning( "DATA-PRIV-001", "Endpoint for users to access their own data required (GDPR Art. 15)", str(users_api), ) # ================== # COMPLIANCE # ================== def _validate_compliance(self) -> None: """Validate compliance requirements.""" # Check HTTPS configuration config_files = [ self.project_root / "app" / "core" / "config.py", self.project_root / "main.py", ] https_configured = False for file in config_files: if file.exists(): content = file.read_text() if re.search(r"https|SSL|TLS|SECURE", content, re.IGNORECASE): https_configured = True break if not https_configured: self.add_warning( "COMP-REG-002", "HTTPS configuration should be documented", "app/core/config.py", ) # Check version control if not (self.project_root / ".git").exists(): self.add_error( "COMP-EVID-003", "Version control (Git) is required", str(self.project_root), ) # Check CI/CD exists (Gitea or GitHub) gitea_ci = self.project_root / ".gitea" / "workflows" / "ci.yml" github_ci = self.project_root / ".github" / "workflows" / "ci.yml" if not gitea_ci.exists() and not github_ci.exists(): self.add_warning( "COMP-EVID-001", "CI workflow for automated testing recommended", ".gitea/workflows/ci.yml or .github/workflows/ci.yml", ) # Check code review process github_pr_template = self.project_root / ".github" / "PULL_REQUEST_TEMPLATE.md" if not github_pr_template.exists(): self.add_warning( "COMP-POL-001", "Pull request template recommended for code review", ".github/PULL_REQUEST_TEMPLATE.md", ) # ================== # CHANGE MANAGEMENT # ================== def _validate_change_management(self) -> None: """Validate change management requirements.""" # Check .gitignore exists and excludes secrets gitignore = self.project_root / ".gitignore" if gitignore.exists(): content = gitignore.read_text() required_exclusions = [".env", "*.pem", "*.key"] for pattern in required_exclusions: # Simplified check - just look for the pattern if pattern.replace("*", "") not in content: self.add_warning( "CHANGE-VC-003", f"Secret pattern '{pattern}' should be in .gitignore", str(gitignore), ) else: self.add_error( "CHANGE-VC-002", ".gitignore file required", str(self.project_root), ) # Check database migrations alembic_dir = self.project_root / "alembic" if not alembic_dir.exists(): self.add_warning( "CHANGE-ROLL-001", "Database migration tool (Alembic) recommended", "alembic/", ) else: # Check for downgrade functions versions_dir = alembic_dir / "versions" if versions_dir.exists(): for file in versions_dir.glob("*.py"): content = file.read_text() if "def upgrade" in content and "def downgrade" not in content: self.add_warning( "CHANGE-ROLL-002", "Migration should include downgrade function", str(file), ) # Check environment separation config_file = self.project_root / "app" / "core" / "config.py" if config_file.exists(): content = config_file.read_text() if not re.search(r"ENVIRONMENT|development|staging|production", content): self.add_warning( "CHANGE-DEP-001", "Environment separation configuration recommended", str(config_file), ) # ================== # THIRD PARTY # ================== def _validate_third_party(self) -> None: """Validate third-party dependency management.""" # Check dependency lock file exists lock_files = ["uv.lock", "poetry.lock", "Pipfile.lock", "requirements.lock"] has_lock = any((self.project_root / f).exists() for f in lock_files) if not has_lock: self.add_warning( "THIRD-DEP-001", "Dependency lock file recommended for reproducible builds", "uv.lock or similar", ) # Check dependency manifest exists manifest_files = ["pyproject.toml", "requirements.txt", "Pipfile"] has_manifest = any((self.project_root / f).exists() for f in manifest_files) if not has_manifest: self.add_error( "THIRD-DEP-002", "Dependency manifest file required", "pyproject.toml", ) # Check for dependency scanning dependabot = self.project_root / ".github" / "dependabot.yml" if not dependabot.exists(): self.add_info( "THIRD-VULN-002", "Consider enabling dependency scanning for security updates", ".github/dependabot.yml", ) # Check for insecure package sources pyproject = self.project_root / "pyproject.toml" if pyproject.exists(): content = pyproject.read_text() if "http://" in content and "https://" not in content: # noqa: SEC034 self.add_error( "THIRD-VEND-001", "Only HTTPS sources allowed for packages", str(pyproject), ) # ================== # DOCUMENTATION # ================== def _validate_documentation(self) -> None: """Validate documentation requirements.""" # Check README exists readme_files = ["README.md", "README.rst", "README.txt"] has_readme = any((self.project_root / f).exists() for f in readme_files) if not has_readme: self.add_error( "DOC-PROJ-001", "Project README required", "README.md", ) else: # Check README has setup instructions for readme in readme_files: readme_path = self.project_root / readme if readme_path.exists(): content = readme_path.read_text().lower() has_setup = any( term in content for term in [ "install", "setup", "quick start", "getting started", ] ) if not has_setup: self.add_warning( "DOC-PROJ-002", "README should include setup instructions", str(readme_path), ) break # Check security policy exists security_files = ["SECURITY.md", ".github/SECURITY.md"] has_security = any((self.project_root / f).exists() for f in security_files) if not has_security: self.add_warning( "DOC-SEC-001", "Security policy (SECURITY.md) recommended", "SECURITY.md", ) # Check API documentation docs_api = self.project_root / "docs" / "api" if not docs_api.exists() or not list(docs_api.glob("*.md")): self.add_warning( "DOC-API-003", "API documentation recommended", "docs/api/", ) # Check authentication documentation auth_doc = self.project_root / "docs" / "api" / "authentication.md" if not auth_doc.exists(): self.add_warning( "DOC-SEC-002", "Authentication documentation recommended", "docs/api/authentication.md", ) # Check architecture documentation arch_docs = self.project_root / "docs" / "architecture" if not arch_docs.exists() or not list(arch_docs.glob("*.md")): self.add_warning( "DOC-ARCH-001", "Architecture documentation recommended", "docs/architecture/", ) # Check deployment documentation deploy_doc = self.project_root / "docs" / "deployment" / "index.md" if not deploy_doc.exists(): self.add_warning( "DOC-OPS-001", "Deployment documentation recommended", "docs/deployment/index.md", ) # ================== # HELPERS # ================== def _check_forbidden_patterns( self, paths: list[str], patterns: list[str], rule_id: str, message: str, ) -> None: """Check for forbidden patterns in files.""" for path_pattern in paths: if "**" in path_pattern: base, pattern = path_pattern.split("**", 1) base_path = self.project_root / base.rstrip("/") if base_path.exists(): files = base_path.rglob(pattern.lstrip("/")) else: continue else: files = [self.project_root / path_pattern] for file in files: if not file.exists() or not file.is_file(): continue try: content = file.read_text() for pattern in patterns: if re.search(pattern, content, re.IGNORECASE): self.add_error(rule_id, message, str(file)) except Exception: pass def main() -> int: """Run audit validation.""" import argparse parser = argparse.ArgumentParser(description="Validate IT internal audit rules") parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output") parser.add_argument( "--format", choices=["text", "json"], default="text", help="Output format", ) parser.parse_args() validator = AuditValidator() validator.load_rules() success = validator.validate() validator.print_results() return 0 if success else 1 if __name__ == "__main__": sys.exit(main())