Complete security audit integration into the enrichment pipeline:
Backend:
- SecurityAuditService with 7 passive checks: HTTPS, SSL cert, security
headers, exposed files, cookies, server info, technology detection
- Constants file with SECURITY_HEADERS, EXPOSED_PATHS, SEVERITY_SCORES
- SecurityAuditResponse schema with JSON field validators + aliases
- Endpoints: POST /security-audit/{id}, POST /security-audit/batch
- Added to full_enrichment pipeline (Step 5, before scoring)
- get_pending_security_audit() query in prospect_service
Frontend:
- Security tab on prospect detail page with grade badge (A+ to F),
score/100, severity counts, HTTPS/SSL status, missing headers,
exposed files, technologies, and full findings list
- "Run Security Audit" button with loading state
- "Security Audit" batch button on scan-jobs page
Tested on batirenovation-strasbourg.fr: Grade D (50/100), 11 issues
found (missing headers, exposed wp-login, server version disclosure).
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
444 lines
18 KiB
Python
444 lines
18 KiB
Python
# app/modules/prospecting/services/security_audit_service.py
|
|
"""
|
|
Security audit service for prospect websites.
|
|
|
|
Performs passive security checks (HTTPS, SSL, headers, exposed files,
|
|
cookies, server info, technology detection) and stores results as
|
|
ProspectSecurityAudit. All checks are read-only — no active exploitation.
|
|
|
|
Migrated from scripts/security-audit/audit.py into the enrichment pipeline.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import socket
|
|
import ssl
|
|
from datetime import UTC, datetime
|
|
|
|
import requests
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.modules.prospecting.models import Prospect, ProspectSecurityAudit
|
|
from app.modules.prospecting.services.security_audit_constants import (
|
|
ADMIN_PATHS,
|
|
EXPOSED_PATHS,
|
|
ROBOTS_SENSITIVE_PATTERNS,
|
|
SECURITY_HEADERS,
|
|
SEVERITY_SCORES,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
REQUEST_TIMEOUT = 10
|
|
USER_AGENT = (
|
|
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 "
|
|
"(KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36"
|
|
)
|
|
|
|
|
|
class SecurityAuditService:
|
|
"""Runs passive security checks against a prospect's website."""
|
|
|
|
def run_audit(self, db: Session, prospect: Prospect) -> ProspectSecurityAudit | None:
|
|
"""Run all security checks and store results."""
|
|
domain = prospect.domain_name
|
|
if not domain or not prospect.has_website:
|
|
return None
|
|
|
|
scheme = "https" if prospect.uses_https else "http"
|
|
url = f"{scheme}://{domain}"
|
|
findings = []
|
|
technologies = []
|
|
score = 100
|
|
has_https = None
|
|
has_valid_ssl = None
|
|
ssl_expires_at = None
|
|
missing_headers = []
|
|
exposed_files = []
|
|
|
|
session = requests.Session()
|
|
session.headers["User-Agent"] = USER_AGENT
|
|
session.verify = True
|
|
session.max_redirects = 5
|
|
|
|
# Fetch the page
|
|
response = None
|
|
html_content = ""
|
|
try:
|
|
response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
html_content = response.text
|
|
if response.url != url:
|
|
url = response.url
|
|
except requests.exceptions.SSLError:
|
|
findings.append(self._finding("Weak SSL/TLS configuration", "critical", "transport",
|
|
"Server supports outdated encryption protocols"))
|
|
try:
|
|
session.verify = False # noqa: SEC047 fallback for broken SSL
|
|
response = session.get(url, timeout=REQUEST_TIMEOUT, allow_redirects=True)
|
|
html_content = response.text
|
|
except Exception:
|
|
pass
|
|
except requests.exceptions.RequestException as e:
|
|
logger.warning("Security audit: cannot reach %s: %s", domain, e)
|
|
return self._save_audit(db, prospect, score=0, grade="F", findings=findings,
|
|
scan_error=f"Cannot reach website: {e}",
|
|
technologies=technologies)
|
|
|
|
# Run checks
|
|
https_findings, has_https = self._check_https(url, html_content)
|
|
findings.extend(https_findings)
|
|
|
|
ssl_findings, has_valid_ssl, ssl_expires_at = self._check_ssl(domain)
|
|
findings.extend(ssl_findings)
|
|
|
|
header_findings, missing_headers = self._check_headers(response)
|
|
findings.extend(header_findings)
|
|
|
|
server_findings, server_techs = self._check_server_info(response)
|
|
findings.extend(server_findings)
|
|
technologies.extend(server_techs)
|
|
|
|
tech_findings, detected_techs = self._check_technology(html_content, response)
|
|
findings.extend(tech_findings)
|
|
technologies.extend(detected_techs)
|
|
|
|
cookie_findings = self._check_cookies(response)
|
|
findings.extend(cookie_findings)
|
|
|
|
exposed_findings, exposed_files = self._check_exposed_files(domain, scheme, session)
|
|
findings.extend(exposed_findings)
|
|
|
|
session.close()
|
|
|
|
# Calculate score
|
|
for f in findings:
|
|
if not f.get("is_positive", False):
|
|
score = max(0, score - SEVERITY_SCORES.get(f["severity"], 0))
|
|
|
|
grade = self._calculate_grade(score)
|
|
|
|
return self._save_audit(
|
|
db, prospect,
|
|
score=score, grade=grade, findings=findings,
|
|
has_https=has_https, has_valid_ssl=has_valid_ssl,
|
|
ssl_expires_at=ssl_expires_at,
|
|
missing_headers=missing_headers, exposed_files=exposed_files,
|
|
technologies=technologies,
|
|
)
|
|
|
|
# ── Check methods ───────────────────────────────────────────────────────
|
|
|
|
def _check_https(self, url: str, html_content: str) -> tuple[list[dict], bool | None]:
|
|
"""Check HTTPS configuration."""
|
|
findings = []
|
|
from urllib.parse import urlparse
|
|
|
|
parsed = urlparse(url)
|
|
has_https = parsed.scheme == "https"
|
|
|
|
if has_https:
|
|
findings.append(self._finding("HTTPS enabled", "info", "transport",
|
|
"Website uses encrypted connections", is_positive=True))
|
|
# Check mixed content
|
|
http_resources = re.findall(r'(src|href|action)=["\']http://[^"\']+["\']', html_content, re.IGNORECASE)
|
|
if http_resources:
|
|
findings.append(self._finding("Mixed content detected", "medium", "transport",
|
|
"HTTPS site loads resources over insecure HTTP"))
|
|
else:
|
|
findings.append(self._finding("No HTTPS", "critical", "transport",
|
|
"Website transmits all data in plain text"))
|
|
|
|
return findings, has_https
|
|
|
|
def _check_ssl(self, domain: str) -> tuple[list[dict], bool | None, datetime | None]:
|
|
"""Check SSL certificate validity."""
|
|
findings = []
|
|
has_valid_ssl = None
|
|
ssl_expires_at = None
|
|
|
|
try:
|
|
context = ssl.create_default_context()
|
|
with socket.create_connection((domain, 443), timeout=REQUEST_TIMEOUT) as sock:
|
|
with context.wrap_socket(sock, server_hostname=domain) as ssock:
|
|
cert = ssock.getpeercert()
|
|
not_after = datetime.strptime(cert["notAfter"], "%b %d %H:%M:%S %Y %Z").replace(tzinfo=UTC)
|
|
days_remaining = (not_after - datetime.now(UTC)).days
|
|
ssl_expires_at = not_after
|
|
|
|
if days_remaining < 0:
|
|
has_valid_ssl = False
|
|
findings.append(self._finding("SSL certificate expired", "critical", "transport",
|
|
f"Certificate expired on {not_after.strftime('%Y-%m-%d')}"))
|
|
elif days_remaining < 30:
|
|
has_valid_ssl = True
|
|
findings.append(self._finding(f"SSL expires in {days_remaining} days", "high", "transport",
|
|
f"Certificate expires on {not_after.strftime('%Y-%m-%d')}"))
|
|
else:
|
|
has_valid_ssl = True
|
|
findings.append(self._finding("SSL certificate valid", "info", "transport",
|
|
f"Valid until {not_after.strftime('%Y-%m-%d')} ({days_remaining} days)",
|
|
is_positive=True))
|
|
|
|
# Check TLS version
|
|
protocol = ssock.version()
|
|
if protocol in ("TLSv1", "TLSv1.1", "SSLv3", "SSLv2"):
|
|
findings.append(self._finding("Weak TLS version", "high", "transport",
|
|
f"Server supports outdated protocol: {protocol}"))
|
|
|
|
except ssl.SSLCertVerificationError:
|
|
has_valid_ssl = False
|
|
findings.append(self._finding("SSL certificate invalid", "critical", "transport",
|
|
"Certificate verification failed"))
|
|
except (TimeoutError, ConnectionRefusedError, OSError):
|
|
pass # No SSL, already caught by HTTPS check
|
|
|
|
return findings, has_valid_ssl, ssl_expires_at
|
|
|
|
def _check_headers(self, response) -> tuple[list[dict], list[str]]:
|
|
"""Check for missing security headers."""
|
|
findings = []
|
|
missing = []
|
|
|
|
if not response:
|
|
return findings, missing
|
|
|
|
for header_name, config in SECURITY_HEADERS.items():
|
|
if header_name in response.headers:
|
|
findings.append(self._finding(f"Header present: {header_name}", "info", "headers",
|
|
header_name, is_positive=True))
|
|
else:
|
|
missing.append(header_name)
|
|
findings.append(self._finding(f"Missing: {header_name}", config["severity"], "headers",
|
|
config["impact"]))
|
|
|
|
return findings, missing
|
|
|
|
def _check_server_info(self, response) -> tuple[list[dict], list[str]]:
|
|
"""Check for server version disclosure."""
|
|
findings = []
|
|
technologies = []
|
|
|
|
if not response:
|
|
return findings, technologies
|
|
|
|
server = response.headers.get("Server", "")
|
|
x_powered = response.headers.get("X-Powered-By", "")
|
|
|
|
info_parts = []
|
|
if server:
|
|
info_parts.append(server)
|
|
technologies.append(server)
|
|
if x_powered:
|
|
info_parts.append(f"X-Powered-By: {x_powered}")
|
|
technologies.append(x_powered)
|
|
|
|
if info_parts:
|
|
has_version = bool(re.search(r"\d+\.\d+", " ".join(info_parts)))
|
|
severity = "medium" if has_version else "low"
|
|
findings.append(self._finding("Server version exposed", severity, "config",
|
|
" | ".join(info_parts)))
|
|
|
|
return findings, technologies
|
|
|
|
def _check_technology(self, html_content: str, response) -> tuple[list[dict], list[str]]:
|
|
"""Detect CMS and technology stack."""
|
|
findings = []
|
|
technologies = []
|
|
|
|
if not html_content:
|
|
return findings, technologies
|
|
|
|
# WordPress
|
|
wp_indicators = ["wp-content/", "wp-includes/", 'name="generator" content="WordPress']
|
|
if any(ind in html_content for ind in wp_indicators):
|
|
version = "unknown"
|
|
ver_match = re.search(r'content="WordPress\s+([\d.]+)"', html_content)
|
|
if ver_match:
|
|
version = ver_match.group(1)
|
|
severity = "medium" if version != "unknown" else "low"
|
|
findings.append(self._finding(f"WordPress detected (v{version})", severity, "technology",
|
|
"Version publicly visible" if version != "unknown" else "CMS detected"))
|
|
technologies.append(f"WordPress {version}")
|
|
|
|
# Joomla
|
|
if "/media/jui/" in html_content or "Joomla" in html_content:
|
|
findings.append(self._finding("Joomla detected", "low", "technology", "CMS detected"))
|
|
technologies.append("Joomla")
|
|
|
|
# Drupal
|
|
if "Drupal" in html_content or "/sites/default/" in html_content:
|
|
findings.append(self._finding("Drupal detected", "low", "technology", "CMS detected"))
|
|
technologies.append("Drupal")
|
|
|
|
# Hosted platforms (not vulnerable in the same way)
|
|
if "wix.com" in html_content:
|
|
technologies.append("Wix")
|
|
if "squarespace.com" in html_content:
|
|
technologies.append("Squarespace")
|
|
if "cdn.shopify.com" in html_content:
|
|
technologies.append("Shopify")
|
|
|
|
return findings, technologies
|
|
|
|
def _check_cookies(self, response) -> list[dict]:
|
|
"""Check cookie security flags."""
|
|
findings = []
|
|
|
|
if not response:
|
|
return findings
|
|
|
|
set_cookie_headers = response.headers.get("Set-Cookie", "")
|
|
if not set_cookie_headers:
|
|
return findings
|
|
|
|
has_insecure = False
|
|
has_no_httponly = False
|
|
has_no_samesite = False
|
|
|
|
for cookie in set_cookie_headers.split(","):
|
|
cookie_lower = cookie.lower()
|
|
if "secure" not in cookie_lower:
|
|
has_insecure = True
|
|
if "httponly" not in cookie_lower:
|
|
has_no_httponly = True
|
|
if "samesite" not in cookie_lower:
|
|
has_no_samesite = True
|
|
|
|
if has_insecure:
|
|
findings.append(self._finding("Cookies lack Secure flag", "medium", "cookies",
|
|
"Session cookies can be intercepted over HTTP"))
|
|
if has_no_httponly:
|
|
findings.append(self._finding("Cookies lack HttpOnly flag", "medium", "cookies",
|
|
"Cookies accessible to JavaScript (XSS risk)"))
|
|
if has_no_samesite:
|
|
findings.append(self._finding("Cookies lack SameSite attribute", "low", "cookies",
|
|
"Vulnerable to cross-site request attacks"))
|
|
|
|
return findings
|
|
|
|
def _check_exposed_files(self, domain: str, scheme: str, session) -> tuple[list[dict], list[str]]:
|
|
"""Check for exposed sensitive files and directories."""
|
|
findings = []
|
|
exposed = []
|
|
base = f"{scheme}://{domain}"
|
|
security_txt_found = False
|
|
robots_content = None
|
|
|
|
for path, description, default_severity in EXPOSED_PATHS:
|
|
try:
|
|
resp = session.get(f"{base}{path}", timeout=REQUEST_TIMEOUT, allow_redirects=False)
|
|
|
|
if path == "/.well-known/security.txt" and resp.status_code == 200:
|
|
security_txt_found = True
|
|
continue
|
|
if path == "/robots.txt" and resp.status_code == 200:
|
|
robots_content = resp.text
|
|
continue
|
|
if path == "/sitemap.xml" or path == "/api/":
|
|
continue
|
|
|
|
if resp.status_code == 200:
|
|
if path in ADMIN_PATHS:
|
|
findings.append(self._finding(f"Admin panel exposed: {path}", "high", "exposure",
|
|
f"Admin login at {base}{path} is publicly accessible"))
|
|
else:
|
|
findings.append(self._finding(f"Exposed: {path}", default_severity, "exposure",
|
|
f"{description} is publicly accessible"))
|
|
exposed.append(path)
|
|
|
|
except Exception:
|
|
continue
|
|
|
|
# Security.txt check
|
|
if not security_txt_found:
|
|
findings.append(self._finding("No security.txt", "info", "exposure",
|
|
"No /.well-known/security.txt for responsible disclosure"))
|
|
|
|
# Robots.txt analysis
|
|
if robots_content:
|
|
disallowed = re.findall(r"Disallow:\s*(.+)", robots_content, re.IGNORECASE)
|
|
sensitive_found = []
|
|
for path in disallowed:
|
|
path = path.strip()
|
|
if any(pattern in path.lower() for pattern in ROBOTS_SENSITIVE_PATTERNS):
|
|
sensitive_found.append(path)
|
|
|
|
if sensitive_found:
|
|
findings.append(self._finding("Robots.txt reveals sensitive paths", "low", "exposure",
|
|
f"Disallowed paths: {', '.join(sensitive_found[:5])}"))
|
|
|
|
return findings, exposed
|
|
|
|
# ── Helpers ──────────────────────────────────────────────────────────────
|
|
|
|
@staticmethod
|
|
def _finding(title: str, severity: str, category: str, detail: str, is_positive: bool = False) -> dict:
|
|
"""Create a finding dict."""
|
|
return {
|
|
"title": title,
|
|
"severity": severity,
|
|
"category": category,
|
|
"detail": detail,
|
|
"is_positive": is_positive,
|
|
}
|
|
|
|
@staticmethod
|
|
def _calculate_grade(score: int) -> str:
|
|
if score >= 95:
|
|
return "A+"
|
|
if score >= 85:
|
|
return "A"
|
|
if score >= 70:
|
|
return "B"
|
|
if score >= 55:
|
|
return "C"
|
|
if score >= 40:
|
|
return "D"
|
|
return "F"
|
|
|
|
def _save_audit(
|
|
self, db: Session, prospect: Prospect, *,
|
|
score: int, grade: str, findings: list[dict],
|
|
has_https: bool | None = None, has_valid_ssl: bool | None = None,
|
|
ssl_expires_at: datetime | None = None,
|
|
missing_headers: list[str] | None = None,
|
|
exposed_files: list[str] | None = None,
|
|
technologies: list[str] | None = None,
|
|
scan_error: str | None = None,
|
|
) -> ProspectSecurityAudit:
|
|
"""Upsert security audit results."""
|
|
audit = prospect.security_audit
|
|
if not audit:
|
|
audit = ProspectSecurityAudit(prospect_id=prospect.id)
|
|
db.add(audit)
|
|
|
|
audit.score = score
|
|
audit.grade = grade
|
|
audit.findings_json = json.dumps(findings)
|
|
audit.has_https = has_https
|
|
audit.has_valid_ssl = has_valid_ssl
|
|
audit.ssl_expires_at = ssl_expires_at
|
|
audit.missing_headers_json = json.dumps(missing_headers or [])
|
|
audit.exposed_files_json = json.dumps(exposed_files or [])
|
|
audit.technologies_json = json.dumps(technologies or [])
|
|
audit.scan_error = scan_error
|
|
|
|
# Denormalized counts
|
|
audit.findings_count_critical = sum(1 for f in findings if f["severity"] == "critical" and not f.get("is_positive"))
|
|
audit.findings_count_high = sum(1 for f in findings if f["severity"] == "high" and not f.get("is_positive"))
|
|
audit.findings_count_medium = sum(1 for f in findings if f["severity"] == "medium" and not f.get("is_positive"))
|
|
audit.findings_count_low = sum(1 for f in findings if f["severity"] == "low" and not f.get("is_positive"))
|
|
audit.findings_count_info = sum(1 for f in findings if f["severity"] == "info" and not f.get("is_positive"))
|
|
|
|
prospect.last_security_audit_at = datetime.now(UTC)
|
|
db.flush()
|
|
|
|
logger.info("Security audit for %s: score=%d grade=%s (%d findings)",
|
|
prospect.domain_name, score, grade,
|
|
len([f for f in findings if not f.get("is_positive")]))
|
|
return audit
|
|
|
|
|
|
security_audit_service = SecurityAuditService()
|