Files
orion/app/modules/prospecting/services/enrichment_service.py
Samir Boulahtit 9a5b7dd061 fix: register hosting public preview route + suppress SSL warnings
- Register hosting public page router in main.py (POC preview at
  /hosting/sites/{id}/preview was returning 404 because the
  public_page_router was set on module definition but never mounted)
- Suppress urllib3 InsecureRequestWarning in enrichment service
  (intentional verify=False for prospect site scanning)

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-04-02 20:01:55 +02:00

631 lines
26 KiB
Python

# app/modules/prospecting/services/enrichment_service.py
"""
Enrichment service for prospect scanning pipeline.
Migrated from marketing-.lu-domains/app/services/enrichment_service.py.
Performs passive HTTP checks, technology detection, performance audits,
and contact scraping for digital prospects.
Uses `requests` (sync) to match Orion's tech stack.
"""
import logging
import re
import socket
import ssl
from datetime import UTC, datetime
import requests
import urllib3
# Suppress SSL warnings for intentional verify=False on prospect sites # noqa: SEC047
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) # noqa: SEC047
from sqlalchemy.orm import Session
from app.modules.prospecting.config import config
from app.modules.prospecting.models import (
Prospect,
ProspectContact,
ProspectPerformanceProfile,
ProspectTechProfile,
)
logger = logging.getLogger(__name__)
# CMS detection patterns
CMS_PATTERNS = {
"wordpress": [r"wp-content", r"wp-includes", r"wordpress"],
"drupal": [r"drupal", r"sites/default", r"sites/all"],
"joomla": [r"/media/jui/", r"joomla", r"/components/com_"],
"shopify": [r"cdn\.shopify\.com", r"shopify"],
"wix": [r"wix\.com", r"wixstatic\.com", r"parastorage\.com"],
"squarespace": [r"squarespace\.com", r"sqsp\.com"],
"webflow": [r"webflow\.com", r"webflow\.io"],
"typo3": [r"typo3", r"/typo3conf/"],
"prestashop": [r"prestashop", r"/modules/ps_"],
"magento": [r"magento", r"mage/", r"/static/version"],
}
JS_FRAMEWORK_PATTERNS = {
"react": [r"react", r"__NEXT_DATA__", r"_next/"],
"vue": [r"vue\.js", r"vue\.min\.js", r"__vue__"],
"angular": [r"angular", r"ng-version"],
"jquery": [r"jquery"],
"alpine": [r"alpine\.js", r"alpinejs"],
}
ANALYTICS_PATTERNS = {
"google_analytics": [r"google-analytics\.com", r"gtag/js", r"ga\.js"],
"google_tag_manager": [r"googletagmanager\.com", r"gtm\.js"],
"matomo": [r"matomo", r"piwik"],
"facebook_pixel": [r"facebook\.net/en_US/fbevents"],
}
class EnrichmentService:
"""Service for prospect enrichment via passive scanning."""
def check_http(self, db: Session, prospect: Prospect) -> dict:
"""Check HTTP connectivity for a prospect's domain."""
result = {
"has_website": False,
"uses_https": False,
"http_status_code": None,
"redirect_url": None,
"error": None,
}
domain = prospect.domain_name
if not domain:
result["error"] = "No domain name"
return result
# Try HTTPS first, then HTTP
for scheme in ["https", "http"]:
try:
url = f"{scheme}://{domain}"
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
result["has_website"] = True
result["uses_https"] = scheme == "https"
result["http_status_code"] = response.status_code
if response.url != url:
result["redirect_url"] = str(response.url)
break
except requests.exceptions.Timeout:
result["error"] = f"Timeout on {scheme}"
except requests.exceptions.RequestException as e:
result["error"] = str(e)
if scheme == "https":
continue
break
# Update prospect
prospect.has_website = result["has_website"]
prospect.uses_https = result["uses_https"]
prospect.http_status_code = result["http_status_code"]
prospect.redirect_url = result["redirect_url"]
prospect.last_http_check_at = datetime.now(UTC)
if result["has_website"]:
prospect.status = "active"
db.flush()
return result
def scan_tech_stack(self, db: Session, prospect: Prospect) -> ProspectTechProfile | None:
"""Scan technology stack from prospect's website HTML."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
try:
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
html = response.text.lower()
headers = dict(response.headers)
cms = self._detect_cms(html)
js_framework = self._detect_js_framework(html)
analytics = self._detect_analytics(html)
server = headers.get("Server", "").split("/")[0] if "Server" in headers else None
server_version = None
if server and "/" in headers.get("Server", ""):
server_version = headers["Server"].split("/", 1)[1].strip()
# SSL certificate check
has_valid_cert = None
cert_issuer = None
cert_expires_at = None
if prospect.uses_https:
try:
ctx = ssl.create_default_context()
with ctx.wrap_socket(
socket.create_connection((domain, 443), timeout=5),
server_hostname=domain,
) as sock:
cert = sock.getpeercert()
has_valid_cert = True
cert_issuer = dict(x[0] for x in cert.get("issuer", [()])).get("organizationName")
not_after = cert.get("notAfter")
if not_after:
cert_expires_at = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
except Exception: # noqa: EXC003
has_valid_cert = False
# Upsert tech profile
profile = prospect.tech_profile
if not profile:
profile = ProspectTechProfile(prospect_id=prospect.id)
db.add(profile)
profile.cms = cms
profile.server = server
profile.server_version = server_version
profile.js_framework = js_framework
profile.analytics = analytics
profile.has_valid_cert = has_valid_cert
profile.cert_issuer = cert_issuer
profile.cert_expires_at = cert_expires_at
profile.scan_source = "basic_http"
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Tech scan failed for %s: %s", domain, e)
if prospect.tech_profile:
prospect.tech_profile.scan_error = str(e)
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return None
def scan_performance(self, db: Session, prospect: Prospect) -> ProspectPerformanceProfile | None:
"""Run PageSpeed Insights audit for a prospect's website."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
api_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
params = {
"url": url,
"strategy": "mobile",
"category": ["performance", "accessibility", "best-practices", "seo"],
}
if config.pagespeed_api_key:
params["key"] = config.pagespeed_api_key
try:
response = requests.get(api_url, params=params, timeout=60)
data = response.json()
# Check for API-level errors (quota exceeded, invalid URL, etc.)
if "error" in data:
error_msg = data["error"].get("message", str(data["error"]))
logger.warning("PageSpeed API error for %s: %s", domain, error_msg)
profile = prospect.performance_profile
if not profile:
profile = ProspectPerformanceProfile(prospect_id=prospect.id)
db.add(profile)
profile.scan_error = error_msg
profile.scan_strategy = "mobile"
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return profile
lighthouse = data.get("lighthouseResult", {})
categories = lighthouse.get("categories", {})
audits = lighthouse.get("audits", {})
perf_score = int((categories.get("performance", {}).get("score") or 0) * 100)
accessibility = int((categories.get("accessibility", {}).get("score") or 0) * 100)
best_practices = int((categories.get("best-practices", {}).get("score") or 0) * 100)
seo = int((categories.get("seo", {}).get("score") or 0) * 100)
# Upsert performance profile
profile = prospect.performance_profile
if not profile:
profile = ProspectPerformanceProfile(prospect_id=prospect.id)
db.add(profile)
profile.performance_score = perf_score
profile.accessibility_score = accessibility
profile.best_practices_score = best_practices
profile.seo_score = seo
# Core Web Vitals
fcp = audits.get("first-contentful-paint", {}).get("numericValue")
profile.first_contentful_paint_ms = int(fcp) if fcp else None
lcp = audits.get("largest-contentful-paint", {}).get("numericValue")
profile.largest_contentful_paint_ms = int(lcp) if lcp else None
tbt = audits.get("total-blocking-time", {}).get("numericValue")
profile.total_blocking_time_ms = int(tbt) if tbt else None
cls_val = audits.get("cumulative-layout-shift", {}).get("numericValue")
profile.cumulative_layout_shift = cls_val
si = audits.get("speed-index", {}).get("numericValue")
profile.speed_index = int(si) if si else None
tti = audits.get("interactive", {}).get("numericValue")
profile.time_to_interactive_ms = int(tti) if tti else None
# Mobile-friendly check
viewport = audits.get("viewport", {}).get("score")
profile.viewport_configured = viewport == 1 if viewport is not None else None
profile.is_mobile_friendly = profile.viewport_configured
profile.scan_strategy = "mobile"
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Performance scan failed for %s: %s", domain, e)
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return None
def scrape_contacts(self, db: Session, prospect: Prospect) -> list[ProspectContact]:
"""Scrape email and phone contacts from prospect's website.
Uses a two-phase approach:
1. Structured extraction from <a href="tel:..."> and <a href="mailto:..."> (high confidence)
2. Regex fallback for emails and international phone numbers (stricter filtering)
"""
from urllib.parse import unquote
domain = prospect.domain_name
if not domain or not prospect.has_website:
return []
scheme = "https" if prospect.uses_https else "http"
base_url = f"{scheme}://{domain}"
paths = ["", "/contact", "/kontakt", "/impressum", "/about", "/mentions-legales"]
# Structured patterns (from <a href> tags)
tel_pattern = re.compile(r'href=["\']tel:([^"\'>\s]+)', re.IGNORECASE)
mailto_pattern = re.compile(r'href=["\']mailto:([^"\'>\s?]+)', re.IGNORECASE)
# Regex fallback patterns
email_regex = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
# International phone: requires + prefix to avoid matching random digit sequences
phone_regex = re.compile(
r"\+\d{1,3}[\s.-]?\(?\d{1,4}\)?[\s.-]?\d{2,4}[\s.-]?\d{2,4}(?:[\s.-]?\d{2,4})?"
)
false_positive_domains = {
"example.com", "email.com", "domain.com", "wordpress.org",
"w3.org", "schema.org", "sentry.io", "googleapis.com",
}
found_emails: set[str] = set()
found_phones: set[str] = set()
contacts: list[ProspectContact] = []
def _add_email(email: str, url: str, source: str) -> None:
email = unquote(email).strip().lower()
email_domain = email.split("@")[1] if "@" in email else ""
if email_domain in false_positive_domains or email in found_emails:
return
found_emails.add(email)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="email",
value=email,
source_url=url,
source_element=source,
))
def _add_phone(phone: str, url: str, source: str) -> None:
phone_clean = re.sub(r"[\s.()\-]", "", phone)
if len(phone_clean) < 10 or phone_clean in found_phones:
return
found_phones.add(phone_clean)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="phone",
value=phone_clean,
source_url=url,
source_element=source,
))
found_addresses: set[str] = set()
def _add_address(address: str, url: str, source: str) -> None:
address = re.sub(r"\s+", " ", address).strip()
if len(address) < 10 or address in found_addresses:
return
found_addresses.add(address)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="address",
value=address,
source_url=url,
source_element=source,
))
session = requests.Session()
session.verify = False # noqa: SEC047 passive scan, not sending sensitive data
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; OrionBot/1.0)"})
for path in paths:
try:
url = base_url + path
response = session.get(url, timeout=config.http_timeout, allow_redirects=True)
if response.status_code != 200:
continue
html = response.text
# Phase 1: structured extraction from href attributes
for phone in tel_pattern.findall(html):
_add_phone(unquote(phone), url, "tel_href")
for email in mailto_pattern.findall(html):
_add_email(email, url, "mailto_href")
# Phase 2: regex fallback — strip SVG/script content first
text_html = re.sub(r"<(svg|script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
for email in email_regex.findall(text_html):
_add_email(email, url, "regex")
for phone in phone_regex.findall(text_html):
_add_phone(phone, url, "regex")
# Phase 3: address extraction
# 3a: Schema.org JSON-LD
for m in re.finditer(r'"streetAddress"\s*:\s*"([^"]+)"', html):
parts = [m.group(1)]
# Try to find locality/postal near the same JSON block
block_end = html[m.end():m.end() + 200]
locality = re.search(r'"addressLocality"\s*:\s*"([^"]+)"', block_end)
postal = re.search(r'"postalCode"\s*:\s*"([^"]+)"', block_end)
if postal:
parts.append(postal.group(1))
if locality:
parts.append(locality.group(1))
_add_address(", ".join(parts), url, "schema_org")
# 3b: <address> HTML tag
for addr_match in re.finditer(r"<address[^>]*>(.*?)</address>", html, re.DOTALL | re.IGNORECASE):
clean = re.sub(r"<[^>]+>", " ", addr_match.group(1))
clean = re.sub(r"\s+", " ", clean).strip()
if clean:
_add_address(clean, url, "address_tag")
# 3c: European street address pattern (number + street keyword + postal code + city)
# Strip tags to plain text (replace tags with spaces for cross-element matching)
plain = re.sub(r"<[^>]+>", " ", text_html)
plain = re.sub(r"\s+", " ", plain)
street_keywords = (
r"(?:rue|avenue|boulevard|allée|impasse|chemin|place|route|passage|quai|"
r"straße|strasse|stra[ßs]e|weg|platz|gasse|" # German
r"street|road|lane|drive|way)" # English
)
addr_pattern = re.compile(
rf"\d{{1,4}}[\s,]+{street_keywords}\s[^<]{{3,60}}?\d{{4,5}}\s+[A-ZÀ-Ü][a-zà-ü]{{2,}}",
re.IGNORECASE,
)
for m in addr_pattern.finditer(plain):
_add_address(m.group(), url, "regex")
except Exception as e: # noqa: EXC003
logger.debug("Contact scrape failed for %s%s: %s", domain, path, e)
session.close()
# Save contacts (replace existing auto-scraped ones)
db.query(ProspectContact).filter(
ProspectContact.prospect_id == prospect.id,
ProspectContact.source_element.in_(["regex", "tel_href", "mailto_href", "schema_org", "address_tag"]),
).delete()
db.add_all(contacts)
# Mark first email and phone as primary
for c in contacts:
if c.contact_type == "email":
c.is_primary = True
break
for c in contacts:
if c.contact_type == "phone":
c.is_primary = True
break
prospect.last_contact_scrape_at = datetime.now(UTC)
db.flush()
return contacts
def _detect_cms(self, html: str) -> str | None:
for cms, patterns in CMS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return cms
return None
def _detect_js_framework(self, html: str) -> str | None:
for framework, patterns in JS_FRAMEWORK_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return framework
return None
def _detect_analytics(self, html: str) -> str | None:
found = []
for tool, patterns in ANALYTICS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
found.append(tool)
break
return ",".join(found) if found else None
def scrape_content(self, db: Session, prospect: Prospect) -> dict | None:
"""Scrape page content (headings, paragraphs, images, services) for POC builder.
Uses BeautifulSoup to extract structured content from the prospect's
website. Stores results as JSON in prospect.scraped_content_json.
"""
import json
from bs4 import BeautifulSoup
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
base_url = f"{scheme}://{domain}"
paths = ["", "/about", "/a-propos", "/services", "/nos-services", "/contact"]
session = requests.Session()
session.verify = False # noqa: SEC047 passive scan
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; OrionBot/1.0)"})
content = {
"meta_description": None,
"headings": [],
"paragraphs": [],
"services": [],
"images": [],
"social_links": {},
"business_hours": None,
"languages_detected": [],
}
seen_headings = set()
seen_paragraphs = set()
for path in paths:
try:
url = base_url + path
resp = session.get(url, timeout=config.http_timeout, allow_redirects=True)
if resp.status_code != 200:
continue
soup = BeautifulSoup(resp.text, "html.parser")
# Meta description (first one found)
if not content["meta_description"]:
meta = soup.find("meta", attrs={"name": "description"})
if meta and meta.get("content"):
content["meta_description"] = meta["content"].strip()
# Language detection
html_tag = soup.find("html")
if html_tag and html_tag.get("lang"):
lang = html_tag["lang"][:2].lower()
if lang not in content["languages_detected"]:
content["languages_detected"].append(lang)
# Headings (H1, H2)
for tag in soup.find_all(["h1", "h2"]):
text = tag.get_text(strip=True)
if text and len(text) > 3 and text not in seen_headings:
seen_headings.add(text)
content["headings"].append(text)
# Paragraphs (substantial ones, skip tiny/boilerplate)
for tag in soup.find_all("p"):
text = tag.get_text(strip=True)
if text and len(text) > 50 and text not in seen_paragraphs:
seen_paragraphs.add(text)
content["paragraphs"].append(text)
if len(content["paragraphs"]) >= 20:
break
# Images (hero/banner sized, skip tiny icons)
for img in soup.find_all("img"):
src = img.get("src") or img.get("data-src")
if not src:
continue
# Make absolute
if src.startswith("//"):
src = "https:" + src
elif src.startswith("/"):
src = base_url + src
elif not src.startswith("http"):
continue
# Skip tiny images, data URIs, tracking pixels
if "1x1" in src or "pixel" in src or src.startswith("data:"):
continue
width = img.get("width", "")
height = img.get("height", "")
if width and width.isdigit() and int(width) < 100:
continue
if height and height.isdigit() and int(height) < 100:
continue
if src not in content["images"]:
content["images"].append(src)
if len(content["images"]) >= 15:
break
# Social links
for a in soup.find_all("a", href=True):
href = a["href"]
for platform, pattern in [
("facebook", "facebook.com"),
("instagram", "instagram.com"),
("linkedin", "linkedin.com"),
("twitter", "twitter.com"),
("youtube", "youtube.com"),
("tiktok", "tiktok.com"),
]:
if pattern in href and platform not in content["social_links"]:
content["social_links"][platform] = href
# Service items (from list items near "service" headings)
for heading in soup.find_all(["h2", "h3"]):
heading_text = heading.get_text(strip=True).lower()
if any(kw in heading_text for kw in ["service", "prestation", "leistung", "angebot", "nos activit"]):
# Look for list items or cards after this heading
sibling = heading.find_next_sibling()
while sibling and sibling.name not in ["h1", "h2", "h3"]:
if sibling.name in ["ul", "ol"]:
for li in sibling.find_all("li"):
text = li.get_text(strip=True)
if text and len(text) > 3 and text not in content["services"]:
content["services"].append(text)
elif sibling.name == "div":
# Cards pattern: divs with h3/h4 + p
card_title = sibling.find(["h3", "h4", "h5"])
if card_title:
text = card_title.get_text(strip=True)
if text and text not in content["services"]:
content["services"].append(text)
sibling = sibling.find_next_sibling()
if len(content["services"]) >= 10:
break
except Exception as e: # noqa: EXC003
logger.debug("Content scrape failed for %s%s: %s", domain, path, e)
session.close()
# Store results
prospect.scraped_content_json = json.dumps(content, ensure_ascii=False)
prospect.last_content_scrape_at = datetime.now(UTC)
db.flush()
logger.info(
"Content scrape for %s: %d headings, %d paragraphs, %d images, %d services",
domain, len(content["headings"]), len(content["paragraphs"]),
len(content["images"]), len(content["services"]),
)
return content
enrichment_service = EnrichmentService()