From 4c750f02687ed63e7abf05d31ef11ed0d82afd7e Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Wed, 1 Apr 2026 20:58:11 +0200 Subject: [PATCH] feat(prospecting): implement security audit pipeline (Workstream 2A) Complete security audit integration into the enrichment pipeline: Backend: - SecurityAuditService with 7 passive checks: HTTPS, SSL cert, security headers, exposed files, cookies, server info, technology detection - Constants file with SECURITY_HEADERS, EXPOSED_PATHS, SEVERITY_SCORES - SecurityAuditResponse schema with JSON field validators + aliases - Endpoints: POST /security-audit/{id}, POST /security-audit/batch - Added to full_enrichment pipeline (Step 5, before scoring) - get_pending_security_audit() query in prospect_service Frontend: - Security tab on prospect detail page with grade badge (A+ to F), score/100, severity counts, HTTPS/SSL status, missing headers, exposed files, technologies, and full findings list - "Run Security Audit" button with loading state - "Security Audit" batch button on scan-jobs page Tested on batirenovation-strasbourg.fr: Grade D (50/100), 11 issues found (missing headers, exposed wp-login, server version disclosure). Co-Authored-By: Claude Opus 4.6 (1M context) --- .../routes/api/admin_enrichment.py | 52 +- app/modules/prospecting/schemas/prospect.py | 4 + .../prospecting/schemas/security_audit.py | 82 ++++ .../prospecting/services/prospect_service.py | 11 + .../services/security_audit_constants.py | 75 +++ .../services/security_audit_service.py | 443 ++++++++++++++++++ .../static/admin/js/prospect-detail.js | 35 ++ .../prospecting/static/admin/js/scan-jobs.js | 1 + .../prospecting/admin/prospect-detail.html | 105 +++++ .../prospecting/admin/scan-jobs.html | 5 + 10 files changed, 812 insertions(+), 1 deletion(-) create mode 100644 app/modules/prospecting/schemas/security_audit.py create mode 100644 app/modules/prospecting/services/security_audit_constants.py create mode 100644 app/modules/prospecting/services/security_audit_service.py diff --git a/app/modules/prospecting/routes/api/admin_enrichment.py b/app/modules/prospecting/routes/api/admin_enrichment.py index ae03f034..d39421a1 100644 --- a/app/modules/prospecting/routes/api/admin_enrichment.py +++ b/app/modules/prospecting/routes/api/admin_enrichment.py @@ -25,9 +25,15 @@ from app.modules.prospecting.schemas.enrichment import ( ScanSingleResponse, ScoreComputeBatchResponse, ) +from app.modules.prospecting.schemas.security_audit import ( + SecurityAuditSingleResponse, +) from app.modules.prospecting.services.enrichment_service import enrichment_service from app.modules.prospecting.services.prospect_service import prospect_service from app.modules.prospecting.services.scoring_service import scoring_service +from app.modules.prospecting.services.security_audit_service import ( + security_audit_service, +) from app.modules.prospecting.services.stats_service import stats_service from app.modules.tenancy.schemas.auth import UserContext @@ -113,6 +119,25 @@ def contact_scrape_batch( return ScanBatchResponse(processed=len(prospects), successful=count) +@router.post("/security-audit/batch", response_model=ScanBatchResponse) +def security_audit_batch( + limit: int = Query(50, ge=1, le=200), + db: Session = Depends(get_db), + current_admin: UserContext = Depends(get_current_admin_api), +): + """Run security audit for pending prospects.""" + job = stats_service.create_job(db, JobType.SECURITY_AUDIT) + prospects = prospect_service.get_pending_security_audit(db, limit=limit) + count = 0 + for prospect in prospects: + result = security_audit_service.run_audit(db, prospect) + if result: + count += 1 + stats_service.complete_job(job, processed=len(prospects)) + db.commit() + return ScanBatchResponse(processed=len(prospects), successful=count) + + @router.post("/score-compute/batch", response_model=ScoreComputeBatchResponse) def compute_scores_batch( limit: int = Query(500, ge=1, le=5000), @@ -182,6 +207,27 @@ def scrape_contacts_single( return ContactScrapeResponse(domain=prospect.domain_name, contacts_found=len(contacts)) +@router.post("/security-audit/{prospect_id}", response_model=SecurityAuditSingleResponse) +def security_audit_single( + prospect_id: int = Path(...), + db: Session = Depends(get_db), + current_admin: UserContext = Depends(get_current_admin_api), +): + """Run security audit for a single prospect.""" + prospect = prospect_service.get_by_id(db, prospect_id) + audit = security_audit_service.run_audit(db, prospect) + db.commit() + findings_count = 0 + if audit: + findings_count = audit.findings_count_critical + audit.findings_count_high + audit.findings_count_medium + audit.findings_count_low + return SecurityAuditSingleResponse( + domain=prospect.domain_name, + score=audit.score if audit else 0, + grade=audit.grade if audit else "F", + findings_count=findings_count, + ) + + @router.post("/full/{prospect_id}", response_model=FullEnrichmentResponse) def full_enrichment( prospect_id: int = Path(...), @@ -209,7 +255,11 @@ def full_enrichment( if prospect.has_website: contacts = enrichment_service.scrape_contacts(db, prospect) - # Step 5: Compute score + # Step 5: Security audit (if has website) + if prospect.has_website: + security_audit_service.run_audit(db, prospect) + + # Step 6: Compute score db.refresh(prospect) score = scoring_service.compute_score(db, prospect) db.commit() diff --git a/app/modules/prospecting/schemas/prospect.py b/app/modules/prospecting/schemas/prospect.py index 3036dd8e..5fe42a77 100644 --- a/app/modules/prospecting/schemas/prospect.py +++ b/app/modules/prospecting/schemas/prospect.py @@ -75,6 +75,7 @@ class ProspectDetailResponse(ProspectResponse): tech_profile: "TechProfileResponse | None" = None performance_profile: "PerformanceProfileResponse | None" = None + security_audit: "SecurityAuditResponse | None" = None contacts: list["ProspectContactResponse"] = [] class Config: @@ -114,6 +115,9 @@ from app.modules.prospecting.schemas.performance_profile import ( PerformanceProfileResponse, # noqa: E402 ) from app.modules.prospecting.schemas.score import ProspectScoreResponse # noqa: E402 +from app.modules.prospecting.schemas.security_audit import ( + SecurityAuditResponse, # noqa: E402 +) from app.modules.prospecting.schemas.tech_profile import ( TechProfileResponse, # noqa: E402 ) diff --git a/app/modules/prospecting/schemas/security_audit.py b/app/modules/prospecting/schemas/security_audit.py new file mode 100644 index 00000000..637fdf32 --- /dev/null +++ b/app/modules/prospecting/schemas/security_audit.py @@ -0,0 +1,82 @@ +# app/modules/prospecting/schemas/security_audit.py +"""Pydantic schemas for security audit responses.""" + +import json +from datetime import datetime + +from pydantic import BaseModel, Field, field_validator + + +class SecurityAuditFinding(BaseModel): + """A single security finding.""" + + title: str + severity: str + category: str + detail: str + is_positive: bool = False + + +class SecurityAuditResponse(BaseModel): + """Schema for security audit detail response.""" + + id: int + prospect_id: int + score: int + grade: str + detected_language: str | None = None + findings: list[SecurityAuditFinding] = Field(default=[], validation_alias="findings_json") + findings_count_critical: int = 0 + findings_count_high: int = 0 + findings_count_medium: int = 0 + findings_count_low: int = 0 + findings_count_info: int = 0 + has_https: bool | None = None + has_valid_ssl: bool | None = None + ssl_expires_at: datetime | None = None + missing_headers: list[str] = Field(default=[], validation_alias="missing_headers_json") + exposed_files: list[str] = Field(default=[], validation_alias="exposed_files_json") + technologies: list[str] = Field(default=[], validation_alias="technologies_json") + scan_error: str | None = None + created_at: datetime + updated_at: datetime + + @field_validator("findings", mode="before") + @classmethod + def parse_findings(cls, v): + if isinstance(v, str): + return json.loads(v) + return v + + @field_validator("missing_headers", mode="before") + @classmethod + def parse_missing_headers(cls, v): + if isinstance(v, str): + return json.loads(v) + return v or [] + + @field_validator("exposed_files", mode="before") + @classmethod + def parse_exposed_files(cls, v): + if isinstance(v, str): + return json.loads(v) + return v or [] + + @field_validator("technologies", mode="before") + @classmethod + def parse_technologies(cls, v): + if isinstance(v, str): + return json.loads(v) + return v or [] + + class Config: + from_attributes = True + + +class SecurityAuditSingleResponse(BaseModel): + """Response for single-prospect security audit.""" + + domain: str + score: int + grade: str + findings_count: int diff --git a/app/modules/prospecting/services/prospect_service.py b/app/modules/prospecting/services/prospect_service.py index fdb96173..3183a60b 100644 --- a/app/modules/prospecting/services/prospect_service.py +++ b/app/modules/prospecting/services/prospect_service.py @@ -251,6 +251,17 @@ class ProspectService: .all() ) + def get_pending_security_audit(self, db: Session, limit: int = 50) -> list[Prospect]: + return ( + db.query(Prospect) + .filter( + Prospect.has_website.is_(True), + Prospect.last_security_audit_at.is_(None), + ) + .limit(limit) + .all() + ) + def count_by_status(self, db: Session) -> dict[str, int]: results = db.query(Prospect.status, func.count(Prospect.id)).group_by(Prospect.status).all() # noqa: SVC-005 - prospecting is platform-scoped, not store-scoped return {status.value if hasattr(status, "value") else str(status): count for status, count in results} diff --git a/app/modules/prospecting/services/security_audit_constants.py b/app/modules/prospecting/services/security_audit_constants.py new file mode 100644 index 00000000..538bac3a --- /dev/null +++ b/app/modules/prospecting/services/security_audit_constants.py @@ -0,0 +1,75 @@ +# app/modules/prospecting/services/security_audit_constants.py +""" +Constants for security audit checks. + +Structural data used by SecurityAuditService. Translations for report +generation are kept in the standalone script (scripts/security-audit/audit.py) +until Phase 2B (report service) migrates them. +""" + +# Severity scores — deducted from a starting score of 100 +SEVERITY_SCORES = { + "critical": 15, + "high": 10, + "medium": 5, + "low": 2, + "info": 0, +} + +# Security headers to check and their severity if missing +SECURITY_HEADERS = { + "Strict-Transport-Security": {"severity": "high", "impact": "MITM attacks, session hijacking via HTTP downgrade"}, + "Content-Security-Policy": {"severity": "high", "impact": "XSS attacks, script injection, data theft"}, + "X-Frame-Options": {"severity": "medium", "impact": "Clickjacking attacks via invisible iframes"}, + "X-Content-Type-Options": {"severity": "medium", "impact": "MIME type confusion, content injection"}, + "Referrer-Policy": {"severity": "low", "impact": "URL parameter leakage to third parties"}, + "Permissions-Policy": {"severity": "low", "impact": "Unrestricted browser API access (camera, mic, location)"}, + "X-XSS-Protection": {"severity": "info", "impact": "Legacy XSS filter not configured"}, +} + +# Paths to check for exposed sensitive files/directories +EXPOSED_PATHS = [ + ("/.env", "Environment file (database passwords, API keys)", "critical"), + ("/.git/config", "Git repository (full source code)", "critical"), + ("/.git/HEAD", "Git repository HEAD", "critical"), + ("/.htpasswd", "Password file", "critical"), + ("/wp-admin/", "WordPress admin panel", "high"), + ("/wp-login.php", "WordPress login page", "high"), + ("/administrator/", "Joomla admin panel", "high"), + ("/admin/", "Admin panel", "high"), + ("/admin/login", "Admin login page", "high"), + ("/phpmyadmin/", "phpMyAdmin (database manager)", "high"), + ("/backup/", "Backup directory", "high"), + ("/backup.zip", "Backup archive", "high"), + ("/backup.sql", "Database backup", "high"), + ("/db.sql", "Database dump", "high"), + ("/dump.sql", "Database dump", "high"), + ("/.htaccess", "Server configuration", "medium"), + ("/web.config", "IIS configuration", "medium"), + ("/server-status", "Apache server status", "medium"), + ("/server-info", "Apache server info", "medium"), + ("/info.php", "PHP info page", "medium"), + ("/phpinfo.php", "PHP info page", "medium"), + ("/graphql", "GraphQL endpoint", "medium"), + ("/debug/", "Debug endpoint", "medium"), + ("/elmah.axd", ".NET error log", "medium"), + ("/trace.axd", ".NET trace log", "medium"), + ("/readme.html", "CMS readme (reveals version)", "low"), + ("/license.txt", "CMS license (reveals version)", "low"), + ("/CHANGELOG.md", "Changelog (reveals version)", "low"), + ("/robots.txt", "Robots file", "info"), + ("/.well-known/security.txt", "Security contact file", "info"), + ("/sitemap.xml", "Sitemap", "info"), + ("/crossdomain.xml", "Flash cross-domain policy", "low"), + ("/api/", "API endpoint", "info"), +] + +# Paths that are admin panels (separate severity logic) +ADMIN_PATHS = {"/wp-admin/", "/wp-login.php", "/administrator/", "/admin/", "/admin/login"} + +# Robots.txt disallow patterns that may reveal sensitive areas +ROBOTS_SENSITIVE_PATTERNS = [ + "admin", "backup", "private", "secret", "staging", + "test", "dev", "internal", "api", "config", + "database", "panel", "dashboard", "login", "cgi-bin", +] diff --git a/app/modules/prospecting/services/security_audit_service.py b/app/modules/prospecting/services/security_audit_service.py new file mode 100644 index 00000000..4c7e36db --- /dev/null +++ b/app/modules/prospecting/services/security_audit_service.py @@ -0,0 +1,443 @@ +# app/modules/prospecting/services/security_audit_service.py +""" +Security audit service for prospect websites. + +Performs passive security checks (HTTPS, SSL, headers, exposed files, +cookies, server info, technology detection) and stores results as +ProspectSecurityAudit. All checks are read-only — no active exploitation. + +Migrated from scripts/security-audit/audit.py into the enrichment pipeline. +""" + +import json +import logging +import re +import socket +import ssl +from datetime import UTC, datetime + +import requests +from sqlalchemy.orm import Session + +from app.modules.prospecting.models import Prospect, ProspectSecurityAudit +from app.modules.prospecting.services.security_audit_constants import ( + ADMIN_PATHS, + EXPOSED_PATHS, + ROBOTS_SENSITIVE_PATTERNS, + SECURITY_HEADERS, + SEVERITY_SCORES, +) + +logger = logging.getLogger(__name__) + +REQUEST_TIMEOUT = 10 +USER_AGENT = ( + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " + "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" +) + + +class SecurityAuditService: + """Runs passive security checks against a prospect's website.""" + + def run_audit(self, db: Session, prospect: Prospect) -> ProspectSecurityAudit | None: + """Run all security checks and store results.""" + domain = prospect.domain_name + if not domain or not prospect.has_website: + return None + + scheme = "https" if prospect.uses_https else "http" + url = f"{scheme}://{domain}" + findings = [] + technologies = [] + score = 100 + has_https = None + has_valid_ssl = None + ssl_expires_at = None + missing_headers = [] + exposed_files = [] + + session = requests.Session() + session.headers["User-Agent"] = USER_AGENT + session.verify = True + session.max_redirects = 5 + + # Fetch the page + response = None + html_content = "" + try: + response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) + html_content = response.text + if response.url != url: + url = response.url + except requests.exceptions.SSLError: + findings.append(self._finding("Weak SSL/TLS configuration", "critical", "transport", + "Server supports outdated encryption protocols")) + try: + session.verify = False # noqa: SEC047 fallback for broken SSL + response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) + html_content = response.text + except Exception: + pass + except requests.exceptions.RequestException as e: + logger.warning("Security audit: cannot reach %s: %s", domain, e) + return self._save_audit(db, prospect, score=0, grade="F", findings=findings, + scan_error=f"Cannot reach website: {e}", + technologies=technologies) + + # Run checks + https_findings, has_https = self._check_https(url, html_content) + findings.extend(https_findings) + + ssl_findings, has_valid_ssl, ssl_expires_at = self._check_ssl(domain) + findings.extend(ssl_findings) + + header_findings, missing_headers = self._check_headers(response) + findings.extend(header_findings) + + server_findings, server_techs = self._check_server_info(response) + findings.extend(server_findings) + technologies.extend(server_techs) + + tech_findings, detected_techs = self._check_technology(html_content, response) + findings.extend(tech_findings) + technologies.extend(detected_techs) + + cookie_findings = self._check_cookies(response) + findings.extend(cookie_findings) + + exposed_findings, exposed_files = self._check_exposed_files(domain, scheme, session) + findings.extend(exposed_findings) + + session.close() + + # Calculate score + for f in findings: + if not f.get("is_positive", False): + score = max(0, score - SEVERITY_SCORES.get(f["severity"], 0)) + + grade = self._calculate_grade(score) + + return self._save_audit( + db, prospect, + score=score, grade=grade, findings=findings, + has_https=has_https, has_valid_ssl=has_valid_ssl, + ssl_expires_at=ssl_expires_at, + missing_headers=missing_headers, exposed_files=exposed_files, + technologies=technologies, + ) + + # ── Check methods ─────────────────────────────────────────────────────── + + def _check_https(self, url: str, html_content: str) -> tuple[list[dict], bool | None]: + """Check HTTPS configuration.""" + findings = [] + from urllib.parse import urlparse + + parsed = urlparse(url) + has_https = parsed.scheme == "https" + + if has_https: + findings.append(self._finding("HTTPS enabled", "info", "transport", + "Website uses encrypted connections", is_positive=True)) + # Check mixed content + http_resources = re.findall(r'(src|href|action)=["\']http://[^"\']+["\']', html_content, re.IGNORECASE) + if http_resources: + findings.append(self._finding("Mixed content detected", "medium", "transport", + "HTTPS site loads resources over insecure HTTP")) + else: + findings.append(self._finding("No HTTPS", "critical", "transport", + "Website transmits all data in plain text")) + + return findings, has_https + + def _check_ssl(self, domain: str) -> tuple[list[dict], bool | None, datetime | None]: + """Check SSL certificate validity.""" + findings = [] + has_valid_ssl = None + ssl_expires_at = None + + try: + context = ssl.create_default_context() + with socket.create_connection((domain, 443), timeout=REQUEST_TIMEOUT) as sock: + with context.wrap_socket(sock, server_hostname=domain) as ssock: + cert = ssock.getpeercert() + not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").replace(tzinfo=UTC) + days_remaining = (not_after - datetime.now(UTC)).days + ssl_expires_at = not_after + + if days_remaining < 0: + has_valid_ssl = False + findings.append(self._finding("SSL certificate expired", "critical", "transport", + f"Certificate expired on {not_after.strftime('%Y-%m-%d')}")) + elif days_remaining < 30: + has_valid_ssl = True + findings.append(self._finding(f"SSL expires in {days_remaining} days", "high", "transport", + f"Certificate expires on {not_after.strftime('%Y-%m-%d')}")) + else: + has_valid_ssl = True + findings.append(self._finding("SSL certificate valid", "info", "transport", + f"Valid until {not_after.strftime('%Y-%m-%d')} ({days_remaining} days)", + is_positive=True)) + + # Check TLS version + protocol = ssock.version() + if protocol in ("TLSv1", "TLSv1.1", "SSLv3", "SSLv2"): + findings.append(self._finding("Weak TLS version", "high", "transport", + f"Server supports outdated protocol: {protocol}")) + + except ssl.SSLCertVerificationError: + has_valid_ssl = False + findings.append(self._finding("SSL certificate invalid", "critical", "transport", + "Certificate verification failed")) + except (TimeoutError, ConnectionRefusedError, OSError): + pass # No SSL, already caught by HTTPS check + + return findings, has_valid_ssl, ssl_expires_at + + def _check_headers(self, response) -> tuple[list[dict], list[str]]: + """Check for missing security headers.""" + findings = [] + missing = [] + + if not response: + return findings, missing + + for header_name, config in SECURITY_HEADERS.items(): + if header_name in response.headers: + findings.append(self._finding(f"Header present: {header_name}", "info", "headers", + header_name, is_positive=True)) + else: + missing.append(header_name) + findings.append(self._finding(f"Missing: {header_name}", config["severity"], "headers", + config["impact"])) + + return findings, missing + + def _check_server_info(self, response) -> tuple[list[dict], list[str]]: + """Check for server version disclosure.""" + findings = [] + technologies = [] + + if not response: + return findings, technologies + + server = response.headers.get("Server", "") + x_powered = response.headers.get("X-Powered-By", "") + + info_parts = [] + if server: + info_parts.append(server) + technologies.append(server) + if x_powered: + info_parts.append(f"X-Powered-By: {x_powered}") + technologies.append(x_powered) + + if info_parts: + has_version = bool(re.search(r"\d+\.\d+", " ".join(info_parts))) + severity = "medium" if has_version else "low" + findings.append(self._finding("Server version exposed", severity, "config", + " | ".join(info_parts))) + + return findings, technologies + + def _check_technology(self, html_content: str, response) -> tuple[list[dict], list[str]]: + """Detect CMS and technology stack.""" + findings = [] + technologies = [] + + if not html_content: + return findings, technologies + + # WordPress + wp_indicators = ["wp-content/", "wp-includes/", 'name="generator" content="WordPress'] + if any(ind in html_content for ind in wp_indicators): + version = "unknown" + ver_match = re.search(r'content="WordPress\s+([\d.]+)"', html_content) + if ver_match: + version = ver_match.group(1) + severity = "medium" if version != "unknown" else "low" + findings.append(self._finding(f"WordPress detected (v{version})", severity, "technology", + "Version publicly visible" if version != "unknown" else "CMS detected")) + technologies.append(f"WordPress {version}") + + # Joomla + if "/media/jui/" in html_content or "Joomla" in html_content: + findings.append(self._finding("Joomla detected", "low", "technology", "CMS detected")) + technologies.append("Joomla") + + # Drupal + if "Drupal" in html_content or "/sites/default/" in html_content: + findings.append(self._finding("Drupal detected", "low", "technology", "CMS detected")) + technologies.append("Drupal") + + # Hosted platforms (not vulnerable in the same way) + if "wix.com" in html_content: + technologies.append("Wix") + if "squarespace.com" in html_content: + technologies.append("Squarespace") + if "cdn.shopify.com" in html_content: + technologies.append("Shopify") + + return findings, technologies + + def _check_cookies(self, response) -> list[dict]: + """Check cookie security flags.""" + findings = [] + + if not response: + return findings + + set_cookie_headers = response.headers.get("Set-Cookie", "") + if not set_cookie_headers: + return findings + + has_insecure = False + has_no_httponly = False + has_no_samesite = False + + for cookie in set_cookie_headers.split(","): + cookie_lower = cookie.lower() + if "secure" not in cookie_lower: + has_insecure = True + if "httponly" not in cookie_lower: + has_no_httponly = True + if "samesite" not in cookie_lower: + has_no_samesite = True + + if has_insecure: + findings.append(self._finding("Cookies lack Secure flag", "medium", "cookies", + "Session cookies can be intercepted over HTTP")) + if has_no_httponly: + findings.append(self._finding("Cookies lack HttpOnly flag", "medium", "cookies", + "Cookies accessible to JavaScript (XSS risk)")) + if has_no_samesite: + findings.append(self._finding("Cookies lack SameSite attribute", "low", "cookies", + "Vulnerable to cross-site request attacks")) + + return findings + + def _check_exposed_files(self, domain: str, scheme: str, session) -> tuple[list[dict], list[str]]: + """Check for exposed sensitive files and directories.""" + findings = [] + exposed = [] + base = f"{scheme}://{domain}" + security_txt_found = False + robots_content = None + + for path, description, default_severity in EXPOSED_PATHS: + try: + resp = session.get(f"{base}{path}", timeout=REQUEST_TIMEOUT, allow_redirects=False) + + if path == "/.well-known/security.txt" and resp.status_code == 200: + security_txt_found = True + continue + if path == "/robots.txt" and resp.status_code == 200: + robots_content = resp.text + continue + if path == "/sitemap.xml" or path == "/api/": + continue + + if resp.status_code == 200: + if path in ADMIN_PATHS: + findings.append(self._finding(f"Admin panel exposed: {path}", "high", "exposure", + f"Admin login at {base}{path} is publicly accessible")) + else: + findings.append(self._finding(f"Exposed: {path}", default_severity, "exposure", + f"{description} is publicly accessible")) + exposed.append(path) + + except Exception: + continue + + # Security.txt check + if not security_txt_found: + findings.append(self._finding("No security.txt", "info", "exposure", + "No /.well-known/security.txt for responsible disclosure")) + + # Robots.txt analysis + if robots_content: + disallowed = re.findall(r"Disallow:\s*(.+)", robots_content, re.IGNORECASE) + sensitive_found = [] + for path in disallowed: + path = path.strip() + if any(pattern in path.lower() for pattern in ROBOTS_SENSITIVE_PATTERNS): + sensitive_found.append(path) + + if sensitive_found: + findings.append(self._finding("Robots.txt reveals sensitive paths", "low", "exposure", + f"Disallowed paths: {', '.join(sensitive_found[:5])}")) + + return findings, exposed + + # ── Helpers ────────────────────────────────────────────────────────────── + + @staticmethod + def _finding(title: str, severity: str, category: str, detail: str, is_positive: bool = False) -> dict: + """Create a finding dict.""" + return { + "title": title, + "severity": severity, + "category": category, + "detail": detail, + "is_positive": is_positive, + } + + @staticmethod + def _calculate_grade(score: int) -> str: + if score >= 95: + return "A+" + if score >= 85: + return "A" + if score >= 70: + return "B" + if score >= 55: + return "C" + if score >= 40: + return "D" + return "F" + + def _save_audit( + self, db: Session, prospect: Prospect, *, + score: int, grade: str, findings: list[dict], + has_https: bool | None = None, has_valid_ssl: bool | None = None, + ssl_expires_at: datetime | None = None, + missing_headers: list[str] | None = None, + exposed_files: list[str] | None = None, + technologies: list[str] | None = None, + scan_error: str | None = None, + ) -> ProspectSecurityAudit: + """Upsert security audit results.""" + audit = prospect.security_audit + if not audit: + audit = ProspectSecurityAudit(prospect_id=prospect.id) + db.add(audit) + + audit.score = score + audit.grade = grade + audit.findings_json = json.dumps(findings) + audit.has_https = has_https + audit.has_valid_ssl = has_valid_ssl + audit.ssl_expires_at = ssl_expires_at + audit.missing_headers_json = json.dumps(missing_headers or []) + audit.exposed_files_json = json.dumps(exposed_files or []) + audit.technologies_json = json.dumps(technologies or []) + audit.scan_error = scan_error + + # Denormalized counts + audit.findings_count_critical = sum(1 for f in findings if f["severity"] == "critical" and not f.get("is_positive")) + audit.findings_count_high = sum(1 for f in findings if f["severity"] == "high" and not f.get("is_positive")) + audit.findings_count_medium = sum(1 for f in findings if f["severity"] == "medium" and not f.get("is_positive")) + audit.findings_count_low = sum(1 for f in findings if f["severity"] == "low" and not f.get("is_positive")) + audit.findings_count_info = sum(1 for f in findings if f["severity"] == "info" and not f.get("is_positive")) + + prospect.last_security_audit_at = datetime.now(UTC) + db.flush() + + logger.info("Security audit for %s: score=%d grade=%s (%d findings)", + prospect.domain_name, score, grade, + len([f for f in findings if not f.get("is_positive")])) + return audit + + +security_audit_service = SecurityAuditService() diff --git a/app/modules/prospecting/static/admin/js/prospect-detail.js b/app/modules/prospecting/static/admin/js/prospect-detail.js index 4c25789a..232cab21 100644 --- a/app/modules/prospecting/static/admin/js/prospect-detail.js +++ b/app/modules/prospecting/static/admin/js/prospect-detail.js @@ -13,10 +13,12 @@ function prospectDetail(prospectId) { campaignSends: [], loading: true, error: null, + auditRunning: false, activeTab: 'overview', tabs: [ { id: 'overview', label: 'Overview' }, + { id: 'security', label: 'Security' }, { id: 'interactions', label: 'Interactions' }, { id: 'campaigns', label: 'Campaigns' }, ], @@ -115,6 +117,39 @@ function prospectDetail(prospectId) { } }, + async runSecurityAudit() { + this.auditRunning = true; + try { + await apiClient.post('/admin/prospecting/enrichment/security-audit/' + this.prospectId); + Utils.showToast('Security audit complete', 'success'); + await this.loadProspect(); + } catch (err) { + Utils.showToast('Audit failed: ' + err.message, 'error'); + } finally { + this.auditRunning = false; + } + }, + + gradeColor(grade) { + if (!grade) return 'text-gray-400'; + if (grade === 'A+' || grade === 'A') return 'text-green-600 dark:text-green-400'; + if (grade === 'B') return 'text-blue-600 dark:text-blue-400'; + if (grade === 'C') return 'text-yellow-600 dark:text-yellow-400'; + if (grade === 'D') return 'text-orange-600 dark:text-orange-400'; + return 'text-red-600 dark:text-red-400'; + }, + + severityBadge(severity) { + var classes = { + critical: 'bg-red-100 text-red-700 dark:bg-red-900 dark:text-red-300', + high: 'bg-orange-100 text-orange-700 dark:bg-orange-900 dark:text-orange-300', + medium: 'bg-yellow-100 text-yellow-700 dark:bg-yellow-900 dark:text-yellow-300', + low: 'bg-blue-100 text-blue-700 dark:bg-blue-900 dark:text-blue-300', + info: 'bg-gray-100 text-gray-600 dark:bg-gray-700 dark:text-gray-400', + }; + return classes[severity] || classes.info; + }, + scoreColor(score) { if (score == null) return 'text-gray-400'; if (score >= 70) return 'text-red-600'; diff --git a/app/modules/prospecting/static/admin/js/scan-jobs.js b/app/modules/prospecting/static/admin/js/scan-jobs.js index 2da4fc39..c806e99d 100644 --- a/app/modules/prospecting/static/admin/js/scan-jobs.js +++ b/app/modules/prospecting/static/admin/js/scan-jobs.js @@ -53,6 +53,7 @@ function scanJobs() { 'tech_scan': 'tech-scan', 'performance_scan': 'performance', 'contact_scrape': 'contacts', + 'security_audit': 'security-audit', 'score_compute': 'score-compute', }, diff --git a/app/modules/prospecting/templates/prospecting/admin/prospect-detail.html b/app/modules/prospecting/templates/prospecting/admin/prospect-detail.html index 5c33d50e..155fc329 100644 --- a/app/modules/prospecting/templates/prospecting/admin/prospect-detail.html +++ b/app/modules/prospecting/templates/prospecting/admin/prospect-detail.html @@ -71,6 +71,7 @@ {{ tab_header([ {'id': 'overview', 'label': 'Overview', 'icon': 'eye'}, + {'id': 'security', 'label': 'Security', 'icon': 'shield-check'}, {'id': 'interactions', 'label': 'Interactions', 'icon': 'chat'}, {'id': 'campaigns', 'label': 'Campaigns', 'icon': 'mail'}, ], active_var='activeTab') }} @@ -198,6 +199,110 @@ + +
+ +
+ +
+ + +

No security audit yet. Click "Run Security Audit" to scan.

+
+
diff --git a/app/modules/prospecting/templates/prospecting/admin/scan-jobs.html b/app/modules/prospecting/templates/prospecting/admin/scan-jobs.html index edd97b56..7e2161c1 100644 --- a/app/modules/prospecting/templates/prospecting/admin/scan-jobs.html +++ b/app/modules/prospecting/templates/prospecting/admin/scan-jobs.html @@ -34,6 +34,11 @@ Contact Scrape +