feat: add audit validation rules and script
Import audit rules from scaffold project covering: - Access control validation - Audit trail requirements - Change management policies - Compliance checks - Data governance rules - Documentation requirements - Third-party dependency checks 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,465 +1,111 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Base Validator
|
||||
==============
|
||||
Shared base class for all validation scripts (architecture, security, performance).
|
||||
Base Validator Class
|
||||
|
||||
Provides common functionality for:
|
||||
- Loading YAML configuration
|
||||
- File pattern matching
|
||||
- Violation tracking
|
||||
- Output formatting (human-readable and JSON)
|
||||
Shared functionality for all validators.
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
import sys
|
||||
from abc import ABC, abstractmethod
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import yaml
|
||||
|
||||
|
||||
class Severity(Enum):
|
||||
"""Validation severity levels"""
|
||||
|
||||
ERROR = "error"
|
||||
WARNING = "warning"
|
||||
INFO = "info"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Violation:
|
||||
"""Represents a rule violation"""
|
||||
|
||||
rule_id: str
|
||||
rule_name: str
|
||||
severity: Severity
|
||||
file_path: Path
|
||||
line_number: int
|
||||
message: str
|
||||
context: str = ""
|
||||
suggestion: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class FileResult:
|
||||
"""Results for a single file validation"""
|
||||
|
||||
file_path: Path
|
||||
errors: int = 0
|
||||
warnings: int = 0
|
||||
info: int = 0
|
||||
|
||||
@property
|
||||
def passed(self) -> bool:
|
||||
return self.errors == 0
|
||||
|
||||
@property
|
||||
def status(self) -> str:
|
||||
if self.errors > 0:
|
||||
return "FAILED"
|
||||
if self.warnings > 0:
|
||||
return "PASSED*"
|
||||
return "PASSED"
|
||||
|
||||
@property
|
||||
def status_icon(self) -> str:
|
||||
if self.errors > 0:
|
||||
return "❌"
|
||||
if self.warnings > 0:
|
||||
return "⚠️"
|
||||
return "✅"
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Results of validation"""
|
||||
|
||||
violations: list[Violation] = field(default_factory=list)
|
||||
files_checked: int = 0
|
||||
rules_applied: int = 0
|
||||
file_results: list[FileResult] = field(default_factory=list)
|
||||
|
||||
def has_errors(self) -> bool:
|
||||
"""Check if there are any error-level violations"""
|
||||
return any(v.severity == Severity.ERROR for v in self.violations)
|
||||
|
||||
def has_warnings(self) -> bool:
|
||||
"""Check if there are any warning-level violations"""
|
||||
return any(v.severity == Severity.WARNING for v in self.violations)
|
||||
|
||||
def error_count(self) -> int:
|
||||
return sum(1 for v in self.violations if v.severity == Severity.ERROR)
|
||||
|
||||
def warning_count(self) -> int:
|
||||
return sum(1 for v in self.violations if v.severity == Severity.WARNING)
|
||||
|
||||
def info_count(self) -> int:
|
||||
return sum(1 for v in self.violations if v.severity == Severity.INFO)
|
||||
|
||||
|
||||
class BaseValidator(ABC):
|
||||
"""Abstract base validator class"""
|
||||
"""Base class for architecture, security, and performance validators."""
|
||||
|
||||
# Subclasses should override these
|
||||
VALIDATOR_NAME = "Base Validator"
|
||||
VALIDATOR_EMOJI = "🔍"
|
||||
RULES_DIR_NAME = ".rules"
|
||||
CONFIG_FILE_NAME = ".rules.yaml"
|
||||
def __init__(self, rules_dir: str, project_root: Path | None = None):
|
||||
self.rules_dir = rules_dir
|
||||
self.project_root = project_root or Path.cwd()
|
||||
self.rules: list[dict[str, Any]] = []
|
||||
self.errors: list[dict[str, Any]] = []
|
||||
self.warnings: list[dict[str, Any]] = []
|
||||
|
||||
def __init__(self, config_path: Path = None, verbose: bool = False):
|
||||
"""Initialize validator with configuration"""
|
||||
self.project_root = Path.cwd()
|
||||
self.config_path = config_path or self.project_root / self.CONFIG_FILE_NAME
|
||||
self.verbose = verbose
|
||||
self.config = self._load_config()
|
||||
self.result = ValidationResult()
|
||||
|
||||
def _load_config(self) -> dict[str, Any]:
|
||||
"""
|
||||
Load validation rules from YAML config.
|
||||
|
||||
Supports two modes:
|
||||
1. Split directory mode: rules directory with multiple YAML files
|
||||
2. Single file mode: single YAML file (legacy)
|
||||
|
||||
The split directory mode takes precedence if it exists.
|
||||
"""
|
||||
# Check for split directory mode first
|
||||
rules_dir = self.project_root / self.RULES_DIR_NAME
|
||||
if rules_dir.is_dir():
|
||||
return self._load_config_from_directory(rules_dir)
|
||||
|
||||
# Fall back to single file mode
|
||||
if not self.config_path.exists():
|
||||
print(f"❌ Configuration file not found: {self.config_path}")
|
||||
print(f" (Also checked for directory: {rules_dir})")
|
||||
sys.exit(1)
|
||||
|
||||
with open(self.config_path) as f:
|
||||
config = yaml.safe_load(f)
|
||||
|
||||
print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
|
||||
return config
|
||||
|
||||
def _load_config_from_directory(self, rules_dir: Path) -> dict[str, Any]:
|
||||
"""
|
||||
Load and merge configuration from split YAML files in a directory.
|
||||
|
||||
Reads _main.yaml first for base config, then merges all other YAML files.
|
||||
"""
|
||||
config: dict[str, Any] = {}
|
||||
|
||||
# Load _main.yaml first (contains project info, principles, ignore patterns)
|
||||
main_file = rules_dir / "_main.yaml"
|
||||
if main_file.exists():
|
||||
with open(main_file) as f:
|
||||
config = yaml.safe_load(f) or {}
|
||||
|
||||
# Load all other YAML files and merge their contents
|
||||
yaml_files = sorted(rules_dir.glob("*.yaml"))
|
||||
for yaml_file in yaml_files:
|
||||
if yaml_file.name == "_main.yaml":
|
||||
continue # Already loaded
|
||||
|
||||
with open(yaml_file) as f:
|
||||
file_config = yaml.safe_load(f) or {}
|
||||
|
||||
# Merge rule sections from this file into main config
|
||||
for key, value in file_config.items():
|
||||
if key.endswith("_rules") and isinstance(value, list):
|
||||
# Merge rule lists
|
||||
if key not in config:
|
||||
config[key] = []
|
||||
config[key].extend(value)
|
||||
elif key not in config:
|
||||
# Add new top-level keys
|
||||
config[key] = value
|
||||
|
||||
print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
|
||||
print(f" (from {len(yaml_files)} files in {rules_dir.name}/)")
|
||||
return config
|
||||
|
||||
def _should_ignore_file(self, file_path: Path) -> bool:
|
||||
"""Check if a file should be ignored based on config patterns"""
|
||||
import fnmatch
|
||||
|
||||
ignore_config = self.config.get("ignore", {})
|
||||
ignore_files = ignore_config.get("files", [])
|
||||
|
||||
# Get relative path for matching
|
||||
try:
|
||||
rel_path = file_path.relative_to(self.project_root)
|
||||
except ValueError:
|
||||
rel_path = file_path
|
||||
|
||||
rel_path_str = str(rel_path)
|
||||
|
||||
for pattern in ignore_files:
|
||||
# Handle glob patterns using fnmatch
|
||||
if "*" in pattern:
|
||||
# fnmatch handles *, **, and ? patterns correctly
|
||||
if fnmatch.fnmatch(rel_path_str, pattern):
|
||||
return True
|
||||
# Also check each path component for patterns like **/.venv/**
|
||||
# This handles cases where the pattern expects any prefix
|
||||
if pattern.startswith("**/"):
|
||||
# Try matching without the **/ prefix (e.g., .venv/** matches .venv/foo)
|
||||
suffix_pattern = pattern[3:] # Remove "**/""
|
||||
if fnmatch.fnmatch(rel_path_str, suffix_pattern):
|
||||
return True
|
||||
elif pattern in rel_path_str:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _add_violation(
|
||||
self,
|
||||
rule_id: str,
|
||||
rule_name: str,
|
||||
severity: Severity,
|
||||
file_path: Path,
|
||||
line_number: int,
|
||||
message: str,
|
||||
context: str = "",
|
||||
suggestion: str = "",
|
||||
):
|
||||
"""Add a violation to the results"""
|
||||
# Check for inline noqa comment
|
||||
if f"noqa: {rule_id.lower()}" in context.lower():
|
||||
def load_rules(self) -> None:
|
||||
"""Load rules from YAML files."""
|
||||
rules_path = self.project_root / self.rules_dir
|
||||
if not rules_path.exists():
|
||||
print(f"Rules directory not found: {rules_path}")
|
||||
return
|
||||
|
||||
self.result.violations.append(
|
||||
Violation(
|
||||
rule_id=rule_id,
|
||||
rule_name=rule_name,
|
||||
severity=severity,
|
||||
file_path=file_path,
|
||||
line_number=line_number,
|
||||
message=message,
|
||||
context=context,
|
||||
suggestion=suggestion,
|
||||
)
|
||||
)
|
||||
for rule_file in rules_path.glob("*.yaml"):
|
||||
if rule_file.name.startswith("_"):
|
||||
continue # Skip main config
|
||||
|
||||
def _get_rule(self, rule_id: str) -> dict | None:
|
||||
"""Look up a rule by ID across all rule categories"""
|
||||
for key, value in self.config.items():
|
||||
if key.endswith("_rules") and isinstance(value, list):
|
||||
for rule in value:
|
||||
if rule.get("id") == rule_id:
|
||||
return rule
|
||||
return None
|
||||
|
||||
def _check_pattern_in_file(
|
||||
self,
|
||||
file_path: Path,
|
||||
content: str,
|
||||
lines: list[str],
|
||||
pattern: str,
|
||||
rule_id: str,
|
||||
rule_name: str,
|
||||
severity: Severity,
|
||||
message: str,
|
||||
suggestion: str = "",
|
||||
exclude_patterns: list[str] = None,
|
||||
):
|
||||
"""Check for a regex pattern in a file and report violations"""
|
||||
exclude_patterns = exclude_patterns or []
|
||||
|
||||
for i, line in enumerate(lines, 1):
|
||||
if re.search(pattern, line, re.IGNORECASE):
|
||||
# Check exclusions
|
||||
should_exclude = False
|
||||
for exclude in exclude_patterns:
|
||||
if exclude in line:
|
||||
should_exclude = True
|
||||
break
|
||||
|
||||
if not should_exclude:
|
||||
self._add_violation(
|
||||
rule_id=rule_id,
|
||||
rule_name=rule_name,
|
||||
severity=severity,
|
||||
file_path=file_path,
|
||||
line_number=i,
|
||||
message=message,
|
||||
context=line.strip()[:100],
|
||||
suggestion=suggestion,
|
||||
)
|
||||
with open(rule_file) as f:
|
||||
data = yaml.safe_load(f)
|
||||
if data and "rules" in data:
|
||||
self.rules.extend(data["rules"])
|
||||
|
||||
@abstractmethod
|
||||
def validate_all(self, target_path: Path = None) -> ValidationResult:
|
||||
"""Validate all files in a directory - must be implemented by subclasses"""
|
||||
pass
|
||||
def validate(self) -> bool:
|
||||
"""Run validation. Returns True if passed."""
|
||||
|
||||
def validate_file(self, file_path: Path, quiet: bool = False) -> ValidationResult:
|
||||
"""Validate a single file"""
|
||||
if not file_path.exists():
|
||||
if not quiet:
|
||||
print(f"❌ File not found: {file_path}")
|
||||
return self.result
|
||||
|
||||
if not file_path.is_file():
|
||||
if not quiet:
|
||||
print(f"❌ Not a file: {file_path}")
|
||||
return self.result
|
||||
|
||||
if not quiet:
|
||||
print(f"\n{self.VALIDATOR_EMOJI} Validating single file: {file_path}\n")
|
||||
|
||||
# Resolve file path to absolute
|
||||
file_path = file_path.resolve()
|
||||
|
||||
if self._should_ignore_file(file_path):
|
||||
if not quiet:
|
||||
print("⏭️ File is in ignore list, skipping")
|
||||
return self.result
|
||||
|
||||
self.result.files_checked += 1
|
||||
|
||||
# Track violations before this file
|
||||
violations_before = len(self.result.violations)
|
||||
|
||||
content = file_path.read_text()
|
||||
lines = content.split("\n")
|
||||
|
||||
# Call subclass-specific validation
|
||||
self._validate_file_content(file_path, content, lines)
|
||||
|
||||
# Calculate violations for this file
|
||||
file_violations = self.result.violations[violations_before:]
|
||||
errors = sum(1 for v in file_violations if v.severity == Severity.ERROR)
|
||||
warnings = sum(1 for v in file_violations if v.severity == Severity.WARNING)
|
||||
info = sum(1 for v in file_violations if v.severity == Severity.INFO)
|
||||
|
||||
# Track file result
|
||||
self.result.file_results.append(
|
||||
FileResult(file_path=file_path, errors=errors, warnings=warnings, info=info)
|
||||
def add_error(
|
||||
self, rule_id: str, message: str, file: str = "", line: int = 0
|
||||
) -> None:
|
||||
"""Add an error."""
|
||||
self.errors.append(
|
||||
{
|
||||
"rule_id": rule_id,
|
||||
"message": message,
|
||||
"file": file,
|
||||
"line": line,
|
||||
"severity": "error",
|
||||
}
|
||||
)
|
||||
|
||||
return self.result
|
||||
|
||||
@abstractmethod
|
||||
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
|
||||
"""Validate file content - must be implemented by subclasses"""
|
||||
pass
|
||||
|
||||
def output_results(self, json_output: bool = False, errors_only: bool = False):
|
||||
"""Output validation results"""
|
||||
if json_output:
|
||||
self._output_json()
|
||||
else:
|
||||
self._output_human(errors_only)
|
||||
|
||||
def _output_json(self):
|
||||
"""Output results as JSON
|
||||
|
||||
Format matches code quality service expectations:
|
||||
- file_path (not file)
|
||||
- line_number (not line)
|
||||
- total_violations count
|
||||
"""
|
||||
try:
|
||||
rel_base = self.project_root
|
||||
except Exception:
|
||||
rel_base = Path.cwd()
|
||||
|
||||
def get_relative_path(file_path: Path) -> str:
|
||||
"""Get relative path from project root"""
|
||||
try:
|
||||
return str(file_path.relative_to(rel_base))
|
||||
except ValueError:
|
||||
return str(file_path)
|
||||
|
||||
output = {
|
||||
"validator": self.VALIDATOR_NAME,
|
||||
"files_checked": self.result.files_checked,
|
||||
"total_violations": len(self.result.violations),
|
||||
"errors": self.result.error_count(),
|
||||
"warnings": self.result.warning_count(),
|
||||
"info": self.result.info_count(),
|
||||
"violations": [
|
||||
{
|
||||
"rule_id": v.rule_id,
|
||||
"rule_name": v.rule_name,
|
||||
"severity": v.severity.value,
|
||||
"file_path": get_relative_path(v.file_path),
|
||||
"line_number": v.line_number,
|
||||
"message": v.message,
|
||||
"context": v.context,
|
||||
"suggestion": v.suggestion,
|
||||
}
|
||||
for v in self.result.violations
|
||||
],
|
||||
}
|
||||
print(json.dumps(output, indent=2))
|
||||
|
||||
def _output_human(self, errors_only: bool = False):
|
||||
"""Output results in human-readable format"""
|
||||
print("\n" + "=" * 80)
|
||||
print(f"📊 {self.VALIDATOR_NAME.upper()} REPORT")
|
||||
print("=" * 80)
|
||||
|
||||
errors = [v for v in self.result.violations if v.severity == Severity.ERROR]
|
||||
warnings = [v for v in self.result.violations if v.severity == Severity.WARNING]
|
||||
info = [v for v in self.result.violations if v.severity == Severity.INFO]
|
||||
|
||||
print(
|
||||
f"\nFiles checked: {self.result.files_checked}"
|
||||
)
|
||||
print(
|
||||
f"Findings: {len(errors)} errors, {len(warnings)} warnings, {len(info)} info"
|
||||
def add_warning(
|
||||
self, rule_id: str, message: str, file: str = "", line: int = 0
|
||||
) -> None:
|
||||
"""Add a warning."""
|
||||
self.warnings.append(
|
||||
{
|
||||
"rule_id": rule_id,
|
||||
"message": message,
|
||||
"file": file,
|
||||
"line": line,
|
||||
"severity": "warning",
|
||||
}
|
||||
)
|
||||
|
||||
if errors:
|
||||
print(f"\n\n❌ ERRORS ({len(errors)}):")
|
||||
print("-" * 80)
|
||||
for v in errors:
|
||||
self._print_violation(v)
|
||||
def add_info(
|
||||
self, rule_id: str, message: str, file: str = "", line: int = 0
|
||||
) -> None:
|
||||
"""Add an informational note."""
|
||||
self.warnings.append(
|
||||
{
|
||||
"rule_id": rule_id,
|
||||
"message": message,
|
||||
"file": file,
|
||||
"line": line,
|
||||
"severity": "info",
|
||||
}
|
||||
)
|
||||
|
||||
if warnings and not errors_only:
|
||||
print(f"\n\n⚠️ WARNINGS ({len(warnings)}):")
|
||||
print("-" * 80)
|
||||
for v in warnings:
|
||||
self._print_violation(v)
|
||||
def print_results(self) -> None:
|
||||
"""Print validation results."""
|
||||
if not self.errors and not self.warnings:
|
||||
print(f"✅ All {self.rules_dir} rules passed!")
|
||||
return
|
||||
|
||||
if info and not errors_only:
|
||||
print(f"\nℹ️ INFO ({len(info)}):")
|
||||
print("-" * 80)
|
||||
for v in info:
|
||||
self._print_violation(v)
|
||||
if self.errors:
|
||||
print(f"\n❌ {len(self.errors)} errors found:")
|
||||
for error in self.errors:
|
||||
print(f" [{error['rule_id']}] {error['message']}")
|
||||
if error["file"]:
|
||||
print(f" File: {error['file']}:{error['line']}")
|
||||
|
||||
print("\n" + "=" * 80)
|
||||
if errors:
|
||||
print("❌ VALIDATION FAILED")
|
||||
elif warnings:
|
||||
print(f"⚠️ VALIDATION PASSED WITH {len(warnings)} WARNING(S)")
|
||||
else:
|
||||
print("✅ VALIDATION PASSED")
|
||||
print("=" * 80)
|
||||
if self.warnings:
|
||||
print(f"\n⚠️ {len(self.warnings)} warnings:")
|
||||
for warning in self.warnings:
|
||||
print(f" [{warning['rule_id']}] {warning['message']}")
|
||||
if warning["file"]:
|
||||
print(f" File: {warning['file']}:{warning['line']}")
|
||||
|
||||
def _print_violation(self, v: Violation):
|
||||
"""Print a single violation"""
|
||||
try:
|
||||
rel_path = v.file_path.relative_to(self.project_root)
|
||||
except ValueError:
|
||||
rel_path = v.file_path
|
||||
|
||||
print(f"\n [{v.rule_id}] {v.rule_name}")
|
||||
print(f" File: {rel_path}:{v.line_number}")
|
||||
print(f" Issue: {v.message}")
|
||||
if v.context and self.verbose:
|
||||
print(f" Context: {v.context}")
|
||||
if v.suggestion:
|
||||
print(f" 💡 Suggestion: {v.suggestion}")
|
||||
|
||||
def get_exit_code(self) -> int:
|
||||
"""Get appropriate exit code based on results"""
|
||||
if self.result.has_errors():
|
||||
return 1
|
||||
return 0
|
||||
def run(self) -> int:
|
||||
"""Run validation and return exit code."""
|
||||
self.load_rules()
|
||||
passed = self.validate()
|
||||
self.print_results()
|
||||
return 0 if passed else 1
|
||||
|
||||
532
scripts/validate_audit.py
Normal file
532
scripts/validate_audit.py
Normal file
@@ -0,0 +1,532 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
IT Internal Audit Validator
|
||||
|
||||
Validates code against internal audit rules defined in .audit-rules/
|
||||
Focuses on governance, compliance, and control requirements.
|
||||
"""
|
||||
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
# Add project root to path
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
from scripts.base_validator import BaseValidator
|
||||
|
||||
|
||||
class AuditValidator(BaseValidator):
|
||||
"""Validates IT internal audit rules."""
|
||||
|
||||
def __init__(self, project_root: Path | None = None):
|
||||
super().__init__(".audit-rules", project_root)
|
||||
|
||||
def validate(self) -> bool:
|
||||
"""Run all audit validations."""
|
||||
self._validate_audit_trail()
|
||||
self._validate_access_control()
|
||||
self._validate_data_governance()
|
||||
self._validate_compliance()
|
||||
self._validate_change_management()
|
||||
self._validate_third_party()
|
||||
self._validate_documentation()
|
||||
return len(self.errors) == 0
|
||||
|
||||
# ==================
|
||||
# AUDIT TRAIL
|
||||
# ==================
|
||||
|
||||
def _validate_audit_trail(self) -> None:
|
||||
"""Validate audit trail requirements."""
|
||||
# Check authentication logging
|
||||
auth_files = [
|
||||
self.project_root / "app" / "api" / "v1" / "auth.py",
|
||||
self.project_root / "app" / "routes" / "admin.py",
|
||||
]
|
||||
|
||||
for file in auth_files:
|
||||
if file.exists():
|
||||
content = file.read_text()
|
||||
if "logger" not in content:
|
||||
self.add_error(
|
||||
"AUDIT-LOG-001",
|
||||
"Authentication operations must include logging",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# Check service layer logging
|
||||
services_path = self.project_root / "app" / "services"
|
||||
if services_path.exists():
|
||||
for file in services_path.glob("*.py"):
|
||||
if file.name == "__init__.py":
|
||||
continue
|
||||
content = file.read_text()
|
||||
# Services that modify data should have logging
|
||||
if re.search(r"def (create|update|delete)", content):
|
||||
if "logger" not in content:
|
||||
self.add_warning(
|
||||
"AUDIT-LOG-002",
|
||||
"Service with data modifications should include logging",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# Check for audit timestamp fields in models
|
||||
# Models can have timestamps directly or inherit from BaseModel/TimestampMixin
|
||||
models_path = self.project_root / "models" / "database"
|
||||
if models_path.exists():
|
||||
for file in models_path.glob("*.py"):
|
||||
# audit_log.py uses timestamp field instead of created_at/updated_at
|
||||
if file.name in ("__init__.py", "base.py", "audit_log.py"):
|
||||
continue
|
||||
content = file.read_text()
|
||||
if "class " in content: # Has model definition
|
||||
# Check if timestamps are present directly or via inheritance
|
||||
has_timestamps = (
|
||||
"created_at" in content
|
||||
or "updated_at" in content
|
||||
or "BaseModel" in content # Inherits from BaseModel
|
||||
or "TimestampMixin" in content # Uses TimestampMixin
|
||||
)
|
||||
if not has_timestamps:
|
||||
self.add_warning(
|
||||
"AUDIT-FIELD-001",
|
||||
"Database model should include audit timestamp fields",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# Check for forbidden log modification patterns
|
||||
self._check_forbidden_patterns(
|
||||
paths=["app/**/*.py"],
|
||||
patterns=[
|
||||
r"os\.remove.*\.log",
|
||||
r"truncate.*log",
|
||||
r"open.*\.log.*['\"]w['\"]",
|
||||
],
|
||||
rule_id="AUDIT-INT-001",
|
||||
message="Application must not modify or delete log files",
|
||||
)
|
||||
|
||||
# ==================
|
||||
# ACCESS CONTROL
|
||||
# ==================
|
||||
|
||||
def _validate_access_control(self) -> None:
|
||||
"""Validate access control requirements."""
|
||||
# Check API endpoints have authentication
|
||||
api_path = self.project_root / "app" / "api" / "v1"
|
||||
if api_path.exists():
|
||||
for file in api_path.glob("*.py"):
|
||||
# Skip endpoints that are intentionally unauthenticated
|
||||
if file.name in ("__init__.py", "health.py", "metrics.py"):
|
||||
continue
|
||||
content = file.read_text()
|
||||
# Check for authentication dependency
|
||||
if "@router" in content:
|
||||
if not re.search(
|
||||
r"CurrentUser|Depends.*get_current_user|AdminUser", content
|
||||
):
|
||||
# auth.py handles its own auth
|
||||
if file.name != "auth.py":
|
||||
self.add_warning(
|
||||
"ACCESS-AUTH-001",
|
||||
"API endpoint should require authentication",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# Check admin routes verify admin role
|
||||
admin_route = self.project_root / "app" / "routes" / "admin.py"
|
||||
if admin_route.exists():
|
||||
content = admin_route.read_text()
|
||||
if "is_admin" not in content and "admin_required" not in content:
|
||||
self.add_warning(
|
||||
"ACCESS-AUTH-002",
|
||||
"Admin routes should verify admin privileges",
|
||||
str(admin_route),
|
||||
)
|
||||
|
||||
# Check password hashing
|
||||
security_file = self.project_root / "app" / "core" / "security.py"
|
||||
if security_file.exists():
|
||||
content = security_file.read_text()
|
||||
if not re.search(r"bcrypt|argon2|scrypt|pbkdf2", content, re.IGNORECASE):
|
||||
self.add_error(
|
||||
"ACCESS-ACCT-003",
|
||||
"Passwords must use approved hashing algorithms",
|
||||
str(security_file),
|
||||
)
|
||||
|
||||
# Check password not in API responses
|
||||
# Note: Only flag if a class with "Response" in name directly defines password_hash
|
||||
# Internal schemas (like UserInDB) are not flagged as they're not API responses
|
||||
schema_path = self.project_root / "models" / "schema"
|
||||
if schema_path.exists():
|
||||
for file in schema_path.glob("*.py"):
|
||||
content = file.read_text()
|
||||
# Check for Response classes that directly define password_hash
|
||||
# Split by class definitions and check each
|
||||
class_blocks = re.split(r"(?=^class\s)", content, flags=re.MULTILINE)
|
||||
for block in class_blocks:
|
||||
# Check if this class is a Response class
|
||||
class_match = re.match(r"class\s+(\w*Response\w*)", block)
|
||||
if class_match:
|
||||
# Check if password_hash is defined in this class (not inherited)
|
||||
if "password_hash:" in block or "password_hash =" in block:
|
||||
if "exclude" not in block.lower():
|
||||
self.add_error(
|
||||
"ACCESS-PRIV-002",
|
||||
f"Password hash must be excluded from {class_match.group(1)}",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# ==================
|
||||
# DATA GOVERNANCE
|
||||
# ==================
|
||||
|
||||
def _validate_data_governance(self) -> None:
|
||||
"""Validate data governance requirements."""
|
||||
# Check PII not logged
|
||||
# Note: Patterns detect actual password values, not descriptive usage like "Password reset"
|
||||
# We look for patterns that suggest password values are being logged:
|
||||
# - password= or password: followed by a variable
|
||||
# - %s or {} after password indicating interpolation of password value
|
||||
self._check_forbidden_patterns(
|
||||
paths=["app/**/*.py", "middleware/**/*.py"],
|
||||
patterns=[
|
||||
r"logger\.\w+\(.*password\s*[=:]\s*['\"]?%", # password=%s
|
||||
r"logger\.\w+\(.*password\s*[=:]\s*\{", # password={var}
|
||||
r"logging\.\w+\(.*password\s*[=:]\s*['\"]?%", # password=%s
|
||||
r"print\(.*password\s*=", # print(password=xxx)
|
||||
r"logger.*credit.*card.*\d", # credit card with numbers
|
||||
r"logger.*\bssn\b.*\d", # SSN with numbers
|
||||
],
|
||||
rule_id="DATA-PII-003",
|
||||
message="PII/sensitive data must not be logged",
|
||||
)
|
||||
|
||||
# Check input validation (Pydantic)
|
||||
schema_path = self.project_root / "models" / "schema"
|
||||
if schema_path.exists():
|
||||
has_validation = False
|
||||
for file in schema_path.glob("*.py"):
|
||||
content = file.read_text()
|
||||
if re.search(r"Field|validator|field_validator", content):
|
||||
has_validation = True
|
||||
break
|
||||
if not has_validation:
|
||||
self.add_error(
|
||||
"DATA-INT-001",
|
||||
"Pydantic validation required for data integrity",
|
||||
str(schema_path),
|
||||
)
|
||||
|
||||
# Check user data access endpoint exists (GDPR)
|
||||
users_api = self.project_root / "app" / "api" / "v1" / "users.py"
|
||||
if users_api.exists():
|
||||
content = users_api.read_text()
|
||||
if "/me" not in content and "current" not in content.lower():
|
||||
self.add_warning(
|
||||
"DATA-PRIV-001",
|
||||
"Endpoint for users to access their own data required (GDPR Art. 15)",
|
||||
str(users_api),
|
||||
)
|
||||
|
||||
# ==================
|
||||
# COMPLIANCE
|
||||
# ==================
|
||||
|
||||
def _validate_compliance(self) -> None:
|
||||
"""Validate compliance requirements."""
|
||||
# Check HTTPS configuration
|
||||
config_files = [
|
||||
self.project_root / "app" / "core" / "config.py",
|
||||
self.project_root / "main.py",
|
||||
]
|
||||
https_configured = False
|
||||
for file in config_files:
|
||||
if file.exists():
|
||||
content = file.read_text()
|
||||
if re.search(r"https|SSL|TLS|SECURE", content, re.IGNORECASE):
|
||||
https_configured = True
|
||||
break
|
||||
if not https_configured:
|
||||
self.add_warning(
|
||||
"COMP-REG-002",
|
||||
"HTTPS configuration should be documented",
|
||||
"app/core/config.py",
|
||||
)
|
||||
|
||||
# Check version control
|
||||
if not (self.project_root / ".git").exists():
|
||||
self.add_error(
|
||||
"COMP-EVID-003",
|
||||
"Version control (Git) is required",
|
||||
str(self.project_root),
|
||||
)
|
||||
|
||||
# Check CI/CD exists
|
||||
ci_workflow = self.project_root / ".github" / "workflows" / "ci.yml"
|
||||
if not ci_workflow.exists():
|
||||
self.add_warning(
|
||||
"COMP-EVID-001",
|
||||
"CI workflow for automated testing recommended",
|
||||
".github/workflows/ci.yml",
|
||||
)
|
||||
|
||||
# Check code review process
|
||||
pr_template = self.project_root / ".github" / "PULL_REQUEST_TEMPLATE.md"
|
||||
if not pr_template.exists():
|
||||
self.add_warning(
|
||||
"COMP-POL-001",
|
||||
"Pull request template recommended for code review",
|
||||
".github/PULL_REQUEST_TEMPLATE.md",
|
||||
)
|
||||
|
||||
# ==================
|
||||
# CHANGE MANAGEMENT
|
||||
# ==================
|
||||
|
||||
def _validate_change_management(self) -> None:
|
||||
"""Validate change management requirements."""
|
||||
# Check .gitignore exists and excludes secrets
|
||||
gitignore = self.project_root / ".gitignore"
|
||||
if gitignore.exists():
|
||||
content = gitignore.read_text()
|
||||
required_exclusions = [".env", "*.pem", "*.key"]
|
||||
for pattern in required_exclusions:
|
||||
# Simplified check - just look for the pattern
|
||||
if pattern.replace("*", "") not in content:
|
||||
self.add_warning(
|
||||
"CHANGE-VC-003",
|
||||
f"Secret pattern '{pattern}' should be in .gitignore",
|
||||
str(gitignore),
|
||||
)
|
||||
else:
|
||||
self.add_error(
|
||||
"CHANGE-VC-002",
|
||||
".gitignore file required",
|
||||
str(self.project_root),
|
||||
)
|
||||
|
||||
# Check database migrations
|
||||
alembic_dir = self.project_root / "alembic"
|
||||
if not alembic_dir.exists():
|
||||
self.add_warning(
|
||||
"CHANGE-ROLL-001",
|
||||
"Database migration tool (Alembic) recommended",
|
||||
"alembic/",
|
||||
)
|
||||
else:
|
||||
# Check for downgrade functions
|
||||
versions_dir = alembic_dir / "versions"
|
||||
if versions_dir.exists():
|
||||
for file in versions_dir.glob("*.py"):
|
||||
content = file.read_text()
|
||||
if "def upgrade" in content and "def downgrade" not in content:
|
||||
self.add_warning(
|
||||
"CHANGE-ROLL-002",
|
||||
"Migration should include downgrade function",
|
||||
str(file),
|
||||
)
|
||||
|
||||
# Check environment separation
|
||||
config_file = self.project_root / "app" / "core" / "config.py"
|
||||
if config_file.exists():
|
||||
content = config_file.read_text()
|
||||
if not re.search(r"ENVIRONMENT|development|staging|production", content):
|
||||
self.add_warning(
|
||||
"CHANGE-DEP-001",
|
||||
"Environment separation configuration recommended",
|
||||
str(config_file),
|
||||
)
|
||||
|
||||
# ==================
|
||||
# THIRD PARTY
|
||||
# ==================
|
||||
|
||||
def _validate_third_party(self) -> None:
|
||||
"""Validate third-party dependency management."""
|
||||
# Check dependency lock file exists
|
||||
lock_files = ["uv.lock", "poetry.lock", "Pipfile.lock", "requirements.lock"]
|
||||
has_lock = any((self.project_root / f).exists() for f in lock_files)
|
||||
if not has_lock:
|
||||
self.add_warning(
|
||||
"THIRD-DEP-001",
|
||||
"Dependency lock file recommended for reproducible builds",
|
||||
"uv.lock or similar",
|
||||
)
|
||||
|
||||
# Check dependency manifest exists
|
||||
manifest_files = ["pyproject.toml", "requirements.txt", "Pipfile"]
|
||||
has_manifest = any((self.project_root / f).exists() for f in manifest_files)
|
||||
if not has_manifest:
|
||||
self.add_error(
|
||||
"THIRD-DEP-002",
|
||||
"Dependency manifest file required",
|
||||
"pyproject.toml",
|
||||
)
|
||||
|
||||
# Check for Dependabot
|
||||
dependabot = self.project_root / ".github" / "dependabot.yml"
|
||||
if not dependabot.exists():
|
||||
self.add_info(
|
||||
"THIRD-VULN-002",
|
||||
"Consider enabling Dependabot for security updates",
|
||||
".github/dependabot.yml",
|
||||
)
|
||||
|
||||
# Check for insecure package sources
|
||||
pyproject = self.project_root / "pyproject.toml"
|
||||
if pyproject.exists():
|
||||
content = pyproject.read_text()
|
||||
if "http://" in content and "https://" not in content:
|
||||
self.add_error(
|
||||
"THIRD-VEND-001",
|
||||
"Only HTTPS sources allowed for packages",
|
||||
str(pyproject),
|
||||
)
|
||||
|
||||
# ==================
|
||||
# DOCUMENTATION
|
||||
# ==================
|
||||
|
||||
def _validate_documentation(self) -> None:
|
||||
"""Validate documentation requirements."""
|
||||
# Check README exists
|
||||
readme_files = ["README.md", "README.rst", "README.txt"]
|
||||
has_readme = any((self.project_root / f).exists() for f in readme_files)
|
||||
if not has_readme:
|
||||
self.add_error(
|
||||
"DOC-PROJ-001",
|
||||
"Project README required",
|
||||
"README.md",
|
||||
)
|
||||
else:
|
||||
# Check README has setup instructions
|
||||
for readme in readme_files:
|
||||
readme_path = self.project_root / readme
|
||||
if readme_path.exists():
|
||||
content = readme_path.read_text().lower()
|
||||
has_setup = any(
|
||||
term in content
|
||||
for term in [
|
||||
"install",
|
||||
"setup",
|
||||
"quick start",
|
||||
"getting started",
|
||||
]
|
||||
)
|
||||
if not has_setup:
|
||||
self.add_warning(
|
||||
"DOC-PROJ-002",
|
||||
"README should include setup instructions",
|
||||
str(readme_path),
|
||||
)
|
||||
break
|
||||
|
||||
# Check security policy exists
|
||||
security_files = ["SECURITY.md", ".github/SECURITY.md"]
|
||||
has_security = any((self.project_root / f).exists() for f in security_files)
|
||||
if not has_security:
|
||||
self.add_warning(
|
||||
"DOC-SEC-001",
|
||||
"Security policy (SECURITY.md) recommended",
|
||||
"SECURITY.md",
|
||||
)
|
||||
|
||||
# Check API documentation
|
||||
docs_api = self.project_root / "docs" / "api"
|
||||
if not docs_api.exists() or not list(docs_api.glob("*.md")):
|
||||
self.add_warning(
|
||||
"DOC-API-003",
|
||||
"API documentation recommended",
|
||||
"docs/api/",
|
||||
)
|
||||
|
||||
# Check authentication documentation
|
||||
auth_doc = self.project_root / "docs" / "api" / "authentication.md"
|
||||
if not auth_doc.exists():
|
||||
self.add_warning(
|
||||
"DOC-SEC-002",
|
||||
"Authentication documentation recommended",
|
||||
"docs/api/authentication.md",
|
||||
)
|
||||
|
||||
# Check architecture documentation
|
||||
arch_docs = self.project_root / "docs" / "architecture"
|
||||
if not arch_docs.exists() or not list(arch_docs.glob("*.md")):
|
||||
self.add_warning(
|
||||
"DOC-ARCH-001",
|
||||
"Architecture documentation recommended",
|
||||
"docs/architecture/",
|
||||
)
|
||||
|
||||
# Check deployment documentation
|
||||
deploy_doc = self.project_root / "docs" / "guides" / "deployment.md"
|
||||
if not deploy_doc.exists():
|
||||
self.add_warning(
|
||||
"DOC-OPS-001",
|
||||
"Deployment documentation recommended",
|
||||
"docs/guides/deployment.md",
|
||||
)
|
||||
|
||||
# ==================
|
||||
# HELPERS
|
||||
# ==================
|
||||
|
||||
def _check_forbidden_patterns(
|
||||
self,
|
||||
paths: list[str],
|
||||
patterns: list[str],
|
||||
rule_id: str,
|
||||
message: str,
|
||||
) -> None:
|
||||
"""Check for forbidden patterns in files."""
|
||||
for path_pattern in paths:
|
||||
if "**" in path_pattern:
|
||||
base, pattern = path_pattern.split("**", 1)
|
||||
base_path = self.project_root / base.rstrip("/")
|
||||
if base_path.exists():
|
||||
files = base_path.rglob(pattern.lstrip("/"))
|
||||
else:
|
||||
continue
|
||||
else:
|
||||
files = [self.project_root / path_pattern]
|
||||
|
||||
for file in files:
|
||||
if not file.exists() or not file.is_file():
|
||||
continue
|
||||
try:
|
||||
content = file.read_text()
|
||||
for pattern in patterns:
|
||||
if re.search(pattern, content, re.IGNORECASE):
|
||||
self.add_error(rule_id, message, str(file))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
def main() -> int:
|
||||
"""Run audit validation."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Validate IT internal audit rules")
|
||||
parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
|
||||
parser.add_argument(
|
||||
"--format",
|
||||
choices=["text", "json"],
|
||||
default="text",
|
||||
help="Output format",
|
||||
)
|
||||
args = parser.parse_args()
|
||||
|
||||
validator = AuditValidator()
|
||||
validator.load_rules()
|
||||
success = validator.validate()
|
||||
validator.print_results()
|
||||
|
||||
return 0 if success else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
Reference in New Issue
Block a user