Files
orion/scripts/validate/base_validator.py
Samir Boulahtit 6d6eba75bf
Some checks failed
CI / pytest (push) Failing after 48m31s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
CI / ruff (push) Successful in 11s
CI / validate (push) Successful in 23s
CI / dependency-scanning (push) Successful in 28s
feat(prospecting): add complete prospecting module for lead discovery and scoring
Migrates scanning pipeline from marketing-.lu-domains app into Orion module.
Supports digital (domain scan) and offline (manual capture) lead channels
with enrichment, scoring, campaign management, and interaction tracking.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-28 00:59:47 +01:00

334 lines
11 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Base Validator Class
Shared functionality for all validators.
"""
import re
from abc import ABC
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Any
import yaml
class Severity(str, Enum):
"""Severity levels for validation findings."""
ERROR = "error"
WARNING = "warning"
INFO = "info"
@dataclass
class Violation:
"""A single validation violation."""
rule_id: str
message: str
severity: Severity
file_path: str = ""
line: int = 0
suggestion: str = ""
@dataclass
class ValidationResult:
"""Result of a validation run."""
violations: list[Violation] = field(default_factory=list)
files_checked: int = 0
def has_errors(self) -> bool:
"""Check if there are any error-level violations."""
return any(v.severity == Severity.ERROR for v in self.violations)
def error_count(self) -> int:
"""Count error-level violations."""
return sum(1 for v in self.violations if v.severity == Severity.ERROR)
def warning_count(self) -> int:
"""Count warning-level violations."""
return sum(1 for v in self.violations if v.severity == Severity.WARNING)
def info_count(self) -> int:
"""Count info-level violations."""
return sum(1 for v in self.violations if v.severity == Severity.INFO)
class BaseValidator(ABC):
"""Base class for architecture, security, and performance validators."""
# Directories/patterns to ignore by default
IGNORE_PATTERNS = [
".venv", "venv", "node_modules", "__pycache__", ".git",
".pytest_cache", ".mypy_cache", "dist", "build", "*.egg-info",
"migrations", "alembic/versions", ".tox", "htmlcov",
"site", # mkdocs build output
"scripts/security-audit", # needs revamping
]
# Regex for noqa comments. Supports both ruff-compatible (SEC001) and
# human-readable (SEC-001) formats: # noqa: SEC001, # noqa: SEC001
_NOQA_PATTERN = re.compile(
r"#\s*noqa(?::\s*([A-Z]+-?\d+(?:\s*,\s*[A-Z]+-?\d+)*))?",
)
# Same for HTML comments: <!-- noqa: SEC015 -->
_NOQA_HTML_PATTERN = re.compile(
r"<!--\s*noqa(?::\s*([A-Z]+-?\d+(?:\s*,\s*[A-Z]+-?\d+)*))?\s*-->",
)
# Same for JS comments: // noqa: PERF062
_NOQA_JS_PATTERN = re.compile(
r"//\s*noqa(?::\s*([A-Z]+-?\d+(?:\s*,\s*[A-Z]+-?\d+)*))?",
)
def __init__(
self,
rules_dir: str = "",
project_root: Path | None = None,
verbose: bool = False,
):
self.rules_dir = rules_dir
self.project_root = project_root or Path.cwd()
self.verbose = verbose
self.rules: list[dict[str, Any]] = []
self.errors: list[dict[str, Any]] = []
self.warnings: list[dict[str, Any]] = []
self.result = ValidationResult()
def load_rules(self) -> None:
"""Load rules from YAML files."""
rules_path = self.project_root / self.rules_dir
if not rules_path.exists():
print(f"Rules directory not found: {rules_path}")
return
for rule_file in rules_path.glob("*.yaml"):
if rule_file.name.startswith("_"):
continue # Skip main config
with open(rule_file) as f:
data = yaml.safe_load(f)
if data and "rules" in data:
self.rules.extend(data["rules"])
def validate(self) -> bool:
"""Run validation. Returns True if passed.
Subclasses should implement validate_all() instead.
"""
result = self.validate_all()
return not result.has_errors() if hasattr(result, "has_errors") else True
def validate_all(self, target_path: Path | None = None) -> ValidationResult:
"""Run all validations. Override in subclasses."""
return ValidationResult()
def add_error(
self, rule_id: str, message: str, file: str = "", line: int = 0
) -> None:
"""Add an error."""
self.errors.append(
{
"rule_id": rule_id,
"message": message,
"file": file,
"line": line,
"severity": "error",
}
)
def add_warning(
self, rule_id: str, message: str, file: str = "", line: int = 0
) -> None:
"""Add a warning."""
self.warnings.append(
{
"rule_id": rule_id,
"message": message,
"file": file,
"line": line,
"severity": "warning",
}
)
def add_info(
self, rule_id: str, message: str, file: str = "", line: int = 0
) -> None:
"""Add an informational note."""
self.warnings.append(
{
"rule_id": rule_id,
"message": message,
"file": file,
"line": line,
"severity": "info",
}
)
def print_results(self) -> None:
"""Print validation results."""
if not self.errors and not self.warnings:
print(f"✅ All {self.rules_dir} rules passed!")
return
if self.errors:
print(f"\n{len(self.errors)} errors found:")
for error in self.errors:
print(f" [{error['rule_id']}] {error['message']}")
if error["file"]:
print(f" File: {error['file']}:{error['line']}")
if self.warnings:
print(f"\n⚠️ {len(self.warnings)} warnings:")
for warning in self.warnings:
print(f" [{warning['rule_id']}] {warning['message']}")
if warning["file"]:
print(f" File: {warning['file']}:{warning['line']}")
def run(self) -> int:
"""Run validation and return exit code."""
self.load_rules()
passed = self.validate()
self.print_results()
return 0 if passed else 1
def _should_ignore_file(self, file_path: Path) -> bool:
"""Check if a file should be ignored based on patterns."""
path_str = str(file_path)
return any(pattern in path_str for pattern in self.IGNORE_PATTERNS)
@staticmethod
def _normalize_rule_id(code: str) -> str:
"""Normalize rule ID by removing dashes: SEC-001 → SEC001."""
return code.replace("-", "")
def _is_noqa_suppressed(self, line: str, rule_id: str) -> bool:
"""Check if a line has a noqa comment suppressing the given rule.
Supports both ruff-compatible and human-readable formats:
- ``# noqa`` — suppresses all rules
- ``# noqa: SEC001`` — ruff-compatible (preferred)
- ``# noqa: SEC001`` — human-readable (also accepted)
- ``<!-- noqa: SEC015 -->`` — HTML comment variant
- ``// noqa: PERF062`` — JS comment variant
"""
normalized_id = self._normalize_rule_id(rule_id)
for pattern in (self._NOQA_PATTERN, self._NOQA_HTML_PATTERN, self._NOQA_JS_PATTERN):
match = pattern.search(line)
if match:
rule_list = match.group(1)
if not rule_list:
return True # bare # noqa → suppress everything
suppressed = [
self._normalize_rule_id(r.strip())
for r in rule_list.split(",")
]
if normalized_id in suppressed:
return True
return False
def _add_violation(
self,
rule_id: str,
rule_name: str,
severity: Severity,
file_path: Path,
line_number: int,
message: str,
context: str = "",
suggestion: str = "",
) -> None:
"""Add a violation to the result."""
violation = Violation(
rule_id=rule_id,
message=f"{rule_name}: {message}",
severity=severity,
file_path=str(file_path),
line=line_number,
suggestion=suggestion,
)
self.result.violations.append(violation)
if self.verbose and context:
print(f" [{rule_id}] {file_path}:{line_number}")
print(f" {message}")
print(f" Context: {context}")
def validate_file(self, file_path: Path) -> ValidationResult:
"""Validate a single file."""
if not file_path.exists():
print(f"File not found: {file_path}")
return self.result
self.result.files_checked = 1
content = file_path.read_text()
lines = content.split("\n")
self._validate_file_content(file_path, content, lines)
return self.result
def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
"""Validate file content. Override in subclasses."""
def output_results(self, json_output: bool = False, errors_only: bool = False) -> None:
"""Output validation results."""
if json_output:
import json
output = {
"files_checked": self.result.files_checked,
"violations": [
{
"rule_id": v.rule_id,
"message": v.message,
"severity": v.severity.value,
"file": v.file_path,
"line": v.line,
"suggestion": v.suggestion,
}
for v in self.result.violations
if not errors_only or v.severity == Severity.ERROR
],
}
print(json.dumps(output, indent=2))
else:
self._print_violations(errors_only)
def _print_violations(self, errors_only: bool = False) -> None:
"""Print violations in human-readable format."""
violations = self.result.violations
if errors_only:
violations = [v for v in violations if v.severity == Severity.ERROR]
if not violations:
print(f"\n✅ No issues found! ({self.result.files_checked} files checked)")
return
errors = [v for v in violations if v.severity == Severity.ERROR]
warnings = [v for v in violations if v.severity == Severity.WARNING]
info = [v for v in violations if v.severity == Severity.INFO]
if errors:
print(f"\n{len(errors)} errors:")
for v in errors:
print(f" [{v.rule_id}] {v.file_path}:{v.line}")
print(f" {v.message}")
if v.suggestion:
print(f" 💡 {v.suggestion}")
if warnings and not errors_only:
print(f"\n⚠️ {len(warnings)} warnings:")
for v in warnings:
print(f" [{v.rule_id}] {v.file_path}:{v.line}")
print(f" {v.message}")
if info and not errors_only:
print(f"\n {len(info)} info:")
for v in info:
print(f" [{v.rule_id}] {v.file_path}:{v.line}")
print(f" {v.message}")
print(f"\n📊 Summary: {len(errors)} errors, {len(warnings)} warnings, {len(info)} info")
def get_exit_code(self) -> int:
"""Get exit code based on validation results."""
return 1 if self.result.has_errors() else 0