# app/modules/prospecting/services/security_audit_service.py """ Security audit service for prospect websites. Performs passive security checks (HTTPS, SSL, headers, exposed files, cookies, server info, technology detection) and stores results as ProspectSecurityAudit. All checks are read-only — no active exploitation. Migrated from scripts/security-audit/audit.py into the enrichment pipeline. """ import json import logging import re import socket import ssl from datetime import UTC, datetime import requests from sqlalchemy.orm import Session from app.modules.prospecting.models import Prospect, ProspectSecurityAudit from app.modules.prospecting.services.security_audit_constants import ( ADMIN_PATHS, EXPOSED_PATHS, ROBOTS_SENSITIVE_PATTERNS, SECURITY_HEADERS, SEVERITY_SCORES, ) logger = logging.getLogger(__name__) REQUEST_TIMEOUT = 10 USER_AGENT = ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 " "(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" ) class SecurityAuditService: """Runs passive security checks against a prospect's website.""" def run_audit(self, db: Session, prospect: Prospect) -> ProspectSecurityAudit | None: """Run all security checks and store results.""" domain = prospect.domain_name if not domain or not prospect.has_website: return None scheme = "https" if prospect.uses_https else "http" url = f"{scheme}://{domain}" findings = [] technologies = [] score = 100 has_https = None has_valid_ssl = None ssl_expires_at = None missing_headers = [] exposed_files = [] session = requests.Session() session.headers["User-Agent"] = USER_AGENT session.verify = True session.max_redirects = 5 # Fetch the page response = None html_content = "" try: response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) html_content = response.text if response.url != url: url = response.url except requests.exceptions.SSLError: findings.append(self._finding("Weak SSL/TLS configuration", "critical", "transport", "Server supports outdated encryption protocols")) try: session.verify = False # noqa: SEC047 fallback for broken SSL response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True) html_content = response.text except Exception: pass except requests.exceptions.RequestException as e: logger.warning("Security audit: cannot reach %s: %s", domain, e) return self._save_audit(db, prospect, score=0, grade="F", findings=findings, scan_error=f"Cannot reach website: {e}", technologies=technologies) # Run checks https_findings, has_https = self._check_https(url, html_content) findings.extend(https_findings) ssl_findings, has_valid_ssl, ssl_expires_at = self._check_ssl(domain) findings.extend(ssl_findings) header_findings, missing_headers = self._check_headers(response) findings.extend(header_findings) server_findings, server_techs = self._check_server_info(response) findings.extend(server_findings) technologies.extend(server_techs) tech_findings, detected_techs = self._check_technology(html_content, response) findings.extend(tech_findings) technologies.extend(detected_techs) cookie_findings = self._check_cookies(response) findings.extend(cookie_findings) exposed_findings, exposed_files = self._check_exposed_files(domain, scheme, session) findings.extend(exposed_findings) session.close() # Calculate score for f in findings: if not f.get("is_positive", False): score = max(0, score - SEVERITY_SCORES.get(f["severity"], 0)) grade = self._calculate_grade(score) return self._save_audit( db, prospect, score=score, grade=grade, findings=findings, has_https=has_https, has_valid_ssl=has_valid_ssl, ssl_expires_at=ssl_expires_at, missing_headers=missing_headers, exposed_files=exposed_files, technologies=technologies, ) # ── Check methods ─────────────────────────────────────────────────────── def _check_https(self, url: str, html_content: str) -> tuple[list[dict], bool | None]: """Check HTTPS configuration.""" findings = [] from urllib.parse import urlparse parsed = urlparse(url) has_https = parsed.scheme == "https" if has_https: findings.append(self._finding("HTTPS enabled", "info", "transport", "Website uses encrypted connections", is_positive=True)) # Check mixed content http_resources = re.findall(r'(src|href|action)=["\']http://[^"\']+["\']', html_content, re.IGNORECASE) if http_resources: findings.append(self._finding("Mixed content detected", "medium", "transport", "HTTPS site loads resources over insecure HTTP")) else: findings.append(self._finding("No HTTPS", "critical", "transport", "Website transmits all data in plain text")) return findings, has_https def _check_ssl(self, domain: str) -> tuple[list[dict], bool | None, datetime | None]: """Check SSL certificate validity.""" findings = [] has_valid_ssl = None ssl_expires_at = None try: context = ssl.create_default_context() with socket.create_connection((domain, 443), timeout=REQUEST_TIMEOUT) as sock: with context.wrap_socket(sock, server_hostname=domain) as ssock: cert = ssock.getpeercert() not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").replace(tzinfo=UTC) days_remaining = (not_after - datetime.now(UTC)).days ssl_expires_at = not_after if days_remaining < 0: has_valid_ssl = False findings.append(self._finding("SSL certificate expired", "critical", "transport", f"Certificate expired on {not_after.strftime('%Y-%m-%d')}")) elif days_remaining < 30: has_valid_ssl = True findings.append(self._finding(f"SSL expires in {days_remaining} days", "high", "transport", f"Certificate expires on {not_after.strftime('%Y-%m-%d')}")) else: has_valid_ssl = True findings.append(self._finding("SSL certificate valid", "info", "transport", f"Valid until {not_after.strftime('%Y-%m-%d')} ({days_remaining} days)", is_positive=True)) # Check TLS version protocol = ssock.version() if protocol in ("TLSv1", "TLSv1.1", "SSLv3", "SSLv2"): findings.append(self._finding("Weak TLS version", "high", "transport", f"Server supports outdated protocol: {protocol}")) except ssl.SSLCertVerificationError: has_valid_ssl = False findings.append(self._finding("SSL certificate invalid", "critical", "transport", "Certificate verification failed")) except (TimeoutError, ConnectionRefusedError, OSError): pass # No SSL, already caught by HTTPS check return findings, has_valid_ssl, ssl_expires_at def _check_headers(self, response) -> tuple[list[dict], list[str]]: """Check for missing security headers.""" findings = [] missing = [] if not response: return findings, missing for header_name, config in SECURITY_HEADERS.items(): if header_name in response.headers: findings.append(self._finding(f"Header present: {header_name}", "info", "headers", header_name, is_positive=True)) else: missing.append(header_name) findings.append(self._finding(f"Missing: {header_name}", config["severity"], "headers", config["impact"])) return findings, missing def _check_server_info(self, response) -> tuple[list[dict], list[str]]: """Check for server version disclosure.""" findings = [] technologies = [] if not response: return findings, technologies server = response.headers.get("Server", "") x_powered = response.headers.get("X-Powered-By", "") info_parts = [] if server: info_parts.append(server) technologies.append(server) if x_powered: info_parts.append(f"X-Powered-By: {x_powered}") technologies.append(x_powered) if info_parts: has_version = bool(re.search(r"\d+\.\d+", " ".join(info_parts))) severity = "medium" if has_version else "low" findings.append(self._finding("Server version exposed", severity, "config", " | ".join(info_parts))) return findings, technologies def _check_technology(self, html_content: str, response) -> tuple[list[dict], list[str]]: """Detect CMS and technology stack.""" findings = [] technologies = [] if not html_content: return findings, technologies # WordPress wp_indicators = ["wp-content/", "wp-includes/", 'name="generator" content="WordPress'] if any(ind in html_content for ind in wp_indicators): version = "unknown" ver_match = re.search(r'content="WordPress\s+([\d.]+)"', html_content) if ver_match: version = ver_match.group(1) severity = "medium" if version != "unknown" else "low" findings.append(self._finding(f"WordPress detected (v{version})", severity, "technology", "Version publicly visible" if version != "unknown" else "CMS detected")) technologies.append(f"WordPress {version}") # Joomla if "/media/jui/" in html_content or "Joomla" in html_content: findings.append(self._finding("Joomla detected", "low", "technology", "CMS detected")) technologies.append("Joomla") # Drupal if "Drupal" in html_content or "/sites/default/" in html_content: findings.append(self._finding("Drupal detected", "low", "technology", "CMS detected")) technologies.append("Drupal") # Hosted platforms (not vulnerable in the same way) if "wix.com" in html_content: technologies.append("Wix") if "squarespace.com" in html_content: technologies.append("Squarespace") if "cdn.shopify.com" in html_content: technologies.append("Shopify") return findings, technologies def _check_cookies(self, response) -> list[dict]: """Check cookie security flags.""" findings = [] if not response: return findings set_cookie_headers = response.headers.get("Set-Cookie", "") if not set_cookie_headers: return findings has_insecure = False has_no_httponly = False has_no_samesite = False for cookie in set_cookie_headers.split(","): cookie_lower = cookie.lower() if "secure" not in cookie_lower: has_insecure = True if "httponly" not in cookie_lower: has_no_httponly = True if "samesite" not in cookie_lower: has_no_samesite = True if has_insecure: findings.append(self._finding("Cookies lack Secure flag", "medium", "cookies", "Session cookies can be intercepted over HTTP")) if has_no_httponly: findings.append(self._finding("Cookies lack HttpOnly flag", "medium", "cookies", "Cookies accessible to JavaScript (XSS risk)")) if has_no_samesite: findings.append(self._finding("Cookies lack SameSite attribute", "low", "cookies", "Vulnerable to cross-site request attacks")) return findings def _check_exposed_files(self, domain: str, scheme: str, session) -> tuple[list[dict], list[str]]: """Check for exposed sensitive files and directories.""" findings = [] exposed = [] base = f"{scheme}://{domain}" security_txt_found = False robots_content = None for path, description, default_severity in EXPOSED_PATHS: try: resp = session.get(f"{base}{path}", timeout=REQUEST_TIMEOUT, allow_redirects=False) if path == "/.well-known/security.txt" and resp.status_code == 200: security_txt_found = True continue if path == "/robots.txt" and resp.status_code == 200: robots_content = resp.text continue if path == "/sitemap.xml" or path == "/api/": continue if resp.status_code == 200: if path in ADMIN_PATHS: findings.append(self._finding(f"Admin panel exposed: {path}", "high", "exposure", f"Admin login at {base}{path} is publicly accessible")) else: findings.append(self._finding(f"Exposed: {path}", default_severity, "exposure", f"{description} is publicly accessible")) exposed.append(path) except Exception: continue # Security.txt check if not security_txt_found: findings.append(self._finding("No security.txt", "info", "exposure", "No /.well-known/security.txt for responsible disclosure")) # Robots.txt analysis if robots_content: disallowed = re.findall(r"Disallow:\s*(.+)", robots_content, re.IGNORECASE) sensitive_found = [] for path in disallowed: path = path.strip() if any(pattern in path.lower() for pattern in ROBOTS_SENSITIVE_PATTERNS): sensitive_found.append(path) if sensitive_found: findings.append(self._finding("Robots.txt reveals sensitive paths", "low", "exposure", f"Disallowed paths: {', '.join(sensitive_found[:5])}")) return findings, exposed # ── Helpers ────────────────────────────────────────────────────────────── @staticmethod def _finding(title: str, severity: str, category: str, detail: str, is_positive: bool = False) -> dict: """Create a finding dict.""" return { "title": title, "severity": severity, "category": category, "detail": detail, "is_positive": is_positive, } @staticmethod def _calculate_grade(score: int) -> str: if score >= 95: return "A+" if score >= 85: return "A" if score >= 70: return "B" if score >= 55: return "C" if score >= 40: return "D" return "F" def _save_audit( self, db: Session, prospect: Prospect, *, score: int, grade: str, findings: list[dict], has_https: bool | None = None, has_valid_ssl: bool | None = None, ssl_expires_at: datetime | None = None, missing_headers: list[str] | None = None, exposed_files: list[str] | None = None, technologies: list[str] | None = None, scan_error: str | None = None, ) -> ProspectSecurityAudit: """Upsert security audit results.""" audit = prospect.security_audit if not audit: audit = ProspectSecurityAudit(prospect_id=prospect.id) db.add(audit) audit.score = score audit.grade = grade audit.findings_json = json.dumps(findings) audit.has_https = has_https audit.has_valid_ssl = has_valid_ssl audit.ssl_expires_at = ssl_expires_at audit.missing_headers_json = json.dumps(missing_headers or []) audit.exposed_files_json = json.dumps(exposed_files or []) audit.technologies_json = json.dumps(technologies or []) audit.scan_error = scan_error # Denormalized counts audit.findings_count_critical = sum(1 for f in findings if f["severity"] == "critical" and not f.get("is_positive")) audit.findings_count_high = sum(1 for f in findings if f["severity"] == "high" and not f.get("is_positive")) audit.findings_count_medium = sum(1 for f in findings if f["severity"] == "medium" and not f.get("is_positive")) audit.findings_count_low = sum(1 for f in findings if f["severity"] == "low" and not f.get("is_positive")) audit.findings_count_info = sum(1 for f in findings if f["severity"] == "info" and not f.get("is_positive")) prospect.last_security_audit_at = datetime.now(UTC) db.flush() logger.info("Security audit for %s: score=%d grade=%s (%d findings)", prospect.domain_name, score, grade, len([f for f in findings if not f.get("is_positive")])) return audit security_audit_service = SecurityAuditService()