# app/modules/prospecting/services/enrichment_service.py
"""
Enrichment service for prospect scanning pipeline.
Migrated from marketing-.lu-domains/app/services/enrichment_service.py.
Performs passive HTTP checks, technology detection, performance audits,
and contact scraping for digital prospects.
Uses `requests` (sync) to match Orion's tech stack.
"""
import logging
import re
import socket
import ssl
from datetime import UTC, datetime
import requests
from sqlalchemy.orm import Session
from app.modules.prospecting.config import config
from app.modules.prospecting.models import (
Prospect,
ProspectContact,
ProspectPerformanceProfile,
ProspectTechProfile,
)
logger = logging.getLogger(__name__)
# CMS detection patterns
CMS_PATTERNS = {
"wordpress": [r"wp-content", r"wp-includes", r"wordpress"],
"drupal": [r"drupal", r"sites/default", r"sites/all"],
"joomla": [r"/media/jui/", r"joomla", r"/components/com_"],
"shopify": [r"cdn\.shopify\.com", r"shopify"],
"wix": [r"wix\.com", r"wixstatic\.com", r"parastorage\.com"],
"squarespace": [r"squarespace\.com", r"sqsp\.com"],
"webflow": [r"webflow\.com", r"webflow\.io"],
"typo3": [r"typo3", r"/typo3conf/"],
"prestashop": [r"prestashop", r"/modules/ps_"],
"magento": [r"magento", r"mage/", r"/static/version"],
}
JS_FRAMEWORK_PATTERNS = {
"react": [r"react", r"__NEXT_DATA__", r"_next/"],
"vue": [r"vue\.js", r"vue\.min\.js", r"__vue__"],
"angular": [r"angular", r"ng-version"],
"jquery": [r"jquery"],
"alpine": [r"alpine\.js", r"alpinejs"],
}
ANALYTICS_PATTERNS = {
"google_analytics": [r"google-analytics\.com", r"gtag/js", r"ga\.js"],
"google_tag_manager": [r"googletagmanager\.com", r"gtm\.js"],
"matomo": [r"matomo", r"piwik"],
"facebook_pixel": [r"facebook\.net/en_US/fbevents"],
}
class EnrichmentService:
"""Service for prospect enrichment via passive scanning."""
def check_http(self, db: Session, prospect: Prospect) -> dict:
"""Check HTTP connectivity for a prospect's domain."""
result = {
"has_website": False,
"uses_https": False,
"http_status_code": None,
"redirect_url": None,
"error": None,
}
domain = prospect.domain_name
if not domain:
result["error"] = "No domain name"
return result
# Try HTTPS first, then HTTP
for scheme in ["https", "http"]:
try:
url = f"{scheme}://{domain}"
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
result["has_website"] = True
result["uses_https"] = scheme == "https"
result["http_status_code"] = response.status_code
if response.url != url:
result["redirect_url"] = str(response.url)
break
except requests.exceptions.Timeout:
result["error"] = f"Timeout on {scheme}"
except requests.exceptions.RequestException as e:
result["error"] = str(e)
if scheme == "https":
continue
break
# Update prospect
prospect.has_website = result["has_website"]
prospect.uses_https = result["uses_https"]
prospect.http_status_code = result["http_status_code"]
prospect.redirect_url = result["redirect_url"]
prospect.last_http_check_at = datetime.now(UTC)
if result["has_website"]:
prospect.status = "active"
db.flush()
return result
def scan_tech_stack(self, db: Session, prospect: Prospect) -> ProspectTechProfile | None:
"""Scan technology stack from prospect's website HTML."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
try:
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
html = response.text.lower()
headers = dict(response.headers)
cms = self._detect_cms(html)
js_framework = self._detect_js_framework(html)
analytics = self._detect_analytics(html)
server = headers.get("Server", "").split("/")[0] if "Server" in headers else None
server_version = None
if server and "/" in headers.get("Server", ""):
server_version = headers["Server"].split("/", 1)[1].strip()
# SSL certificate check
has_valid_cert = None
cert_issuer = None
cert_expires_at = None
if prospect.uses_https:
try:
ctx = ssl.create_default_context()
with ctx.wrap_socket(
socket.create_connection((domain, 443), timeout=5),
server_hostname=domain,
) as sock:
cert = sock.getpeercert()
has_valid_cert = True
cert_issuer = dict(x[0] for x in cert.get("issuer", [()])).get("organizationName")
not_after = cert.get("notAfter")
if not_after:
cert_expires_at = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
except Exception: # noqa: EXC003
has_valid_cert = False
# Upsert tech profile
profile = prospect.tech_profile
if not profile:
profile = ProspectTechProfile(prospect_id=prospect.id)
db.add(profile)
profile.cms = cms
profile.server = server
profile.server_version = server_version
profile.js_framework = js_framework
profile.analytics = analytics
profile.has_valid_cert = has_valid_cert
profile.cert_issuer = cert_issuer
profile.cert_expires_at = cert_expires_at
profile.scan_source = "basic_http"
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Tech scan failed for %s: %s", domain, e)
if prospect.tech_profile:
prospect.tech_profile.scan_error = str(e)
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return None
def scan_performance(self, db: Session, prospect: Prospect) -> ProspectPerformanceProfile | None:
"""Run PageSpeed Insights audit for a prospect's website."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
api_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
params = {
"url": url,
"strategy": "mobile",
"category": ["performance", "accessibility", "best-practices", "seo"],
}
if config.pagespeed_api_key:
params["key"] = config.pagespeed_api_key
try:
response = requests.get(api_url, params=params, timeout=60)
data = response.json()
# Check for API-level errors (quota exceeded, invalid URL, etc.)
if "error" in data:
error_msg = data["error"].get("message", str(data["error"]))
logger.warning("PageSpeed API error for %s: %s", domain, error_msg)
profile = prospect.performance_profile
if not profile:
profile = ProspectPerformanceProfile(prospect_id=prospect.id)
db.add(profile)
profile.scan_error = error_msg
profile.scan_strategy = "mobile"
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return profile
lighthouse = data.get("lighthouseResult", {})
categories = lighthouse.get("categories", {})
audits = lighthouse.get("audits", {})
perf_score = int((categories.get("performance", {}).get("score") or 0) * 100)
accessibility = int((categories.get("accessibility", {}).get("score") or 0) * 100)
best_practices = int((categories.get("best-practices", {}).get("score") or 0) * 100)
seo = int((categories.get("seo", {}).get("score") or 0) * 100)
# Upsert performance profile
profile = prospect.performance_profile
if not profile:
profile = ProspectPerformanceProfile(prospect_id=prospect.id)
db.add(profile)
profile.performance_score = perf_score
profile.accessibility_score = accessibility
profile.best_practices_score = best_practices
profile.seo_score = seo
# Core Web Vitals
fcp = audits.get("first-contentful-paint", {}).get("numericValue")
profile.first_contentful_paint_ms = int(fcp) if fcp else None
lcp = audits.get("largest-contentful-paint", {}).get("numericValue")
profile.largest_contentful_paint_ms = int(lcp) if lcp else None
tbt = audits.get("total-blocking-time", {}).get("numericValue")
profile.total_blocking_time_ms = int(tbt) if tbt else None
cls_val = audits.get("cumulative-layout-shift", {}).get("numericValue")
profile.cumulative_layout_shift = cls_val
si = audits.get("speed-index", {}).get("numericValue")
profile.speed_index = int(si) if si else None
tti = audits.get("interactive", {}).get("numericValue")
profile.time_to_interactive_ms = int(tti) if tti else None
# Mobile-friendly check
viewport = audits.get("viewport", {}).get("score")
profile.viewport_configured = viewport == 1 if viewport is not None else None
profile.is_mobile_friendly = profile.viewport_configured
profile.scan_strategy = "mobile"
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Performance scan failed for %s: %s", domain, e)
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return None
def scrape_contacts(self, db: Session, prospect: Prospect) -> list[ProspectContact]:
"""Scrape email and phone contacts from prospect's website.
Uses a two-phase approach:
1. Structured extraction from and (high confidence)
2. Regex fallback for emails and international phone numbers (stricter filtering)
"""
from urllib.parse import unquote
domain = prospect.domain_name
if not domain or not prospect.has_website:
return []
scheme = "https" if prospect.uses_https else "http"
base_url = f"{scheme}://{domain}"
paths = ["", "/contact", "/kontakt", "/impressum", "/about", "/mentions-legales"]
# Structured patterns (from tags)
tel_pattern = re.compile(r'href=["\']tel:([^"\'>\s]+)', re.IGNORECASE)
mailto_pattern = re.compile(r'href=["\']mailto:([^"\'>\s?]+)', re.IGNORECASE)
# Regex fallback patterns
email_regex = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
# International phone: requires + prefix to avoid matching random digit sequences
phone_regex = re.compile(
r"\+\d{1,3}[\s.-]?\(?\d{1,4}\)?[\s.-]?\d{2,4}[\s.-]?\d{2,4}(?:[\s.-]?\d{2,4})?"
)
false_positive_domains = {
"example.com", "email.com", "domain.com", "wordpress.org",
"w3.org", "schema.org", "sentry.io", "googleapis.com",
}
found_emails: set[str] = set()
found_phones: set[str] = set()
contacts: list[ProspectContact] = []
def _add_email(email: str, url: str, source: str) -> None:
email = unquote(email).strip().lower()
email_domain = email.split("@")[1] if "@" in email else ""
if email_domain in false_positive_domains or email in found_emails:
return
found_emails.add(email)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="email",
value=email,
source_url=url,
source_element=source,
))
def _add_phone(phone: str, url: str, source: str) -> None:
phone_clean = re.sub(r"[\s.()\-]", "", phone)
if len(phone_clean) < 10 or phone_clean in found_phones:
return
found_phones.add(phone_clean)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="phone",
value=phone_clean,
source_url=url,
source_element=source,
))
found_addresses: set[str] = set()
def _add_address(address: str, url: str, source: str) -> None:
address = re.sub(r"\s+", " ", address).strip()
if len(address) < 10 or address in found_addresses:
return
found_addresses.add(address)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="address",
value=address,
source_url=url,
source_element=source,
))
session = requests.Session()
session.verify = False # noqa: SEC047 passive scan, not sending sensitive data
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; OrionBot/1.0)"})
for path in paths:
try:
url = base_url + path
response = session.get(url, timeout=config.http_timeout, allow_redirects=True)
if response.status_code != 200:
continue
html = response.text
# Phase 1: structured extraction from href attributes
for phone in tel_pattern.findall(html):
_add_phone(unquote(phone), url, "tel_href")
for email in mailto_pattern.findall(html):
_add_email(email, url, "mailto_href")
# Phase 2: regex fallback — strip SVG/script content first
text_html = re.sub(r"<(svg|script|style)[^>]*>.*?\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
for email in email_regex.findall(text_html):
_add_email(email, url, "regex")
for phone in phone_regex.findall(text_html):
_add_phone(phone, url, "regex")
# Phase 3: address extraction
# 3a: Schema.org JSON-LD
for m in re.finditer(r'"streetAddress"\s*:\s*"([^"]+)"', html):
parts = [m.group(1)]
# Try to find locality/postal near the same JSON block
block_end = html[m.end():m.end() + 200]
locality = re.search(r'"addressLocality"\s*:\s*"([^"]+)"', block_end)
postal = re.search(r'"postalCode"\s*:\s*"([^"]+)"', block_end)
if postal:
parts.append(postal.group(1))
if locality:
parts.append(locality.group(1))
_add_address(", ".join(parts), url, "schema_org")
# 3b: HTML tag
for addr_match in re.finditer(r"]*>(.*?)", html, re.DOTALL | re.IGNORECASE):
clean = re.sub(r"<[^>]+>", " ", addr_match.group(1))
clean = re.sub(r"\s+", " ", clean).strip()
if clean:
_add_address(clean, url, "address_tag")
# 3c: European street address pattern (number + street keyword + postal code + city)
# Strip tags to plain text (replace tags with spaces for cross-element matching)
plain = re.sub(r"<[^>]+>", " ", text_html)
plain = re.sub(r"\s+", " ", plain)
street_keywords = (
r"(?:rue|avenue|boulevard|allée|impasse|chemin|place|route|passage|quai|"
r"straße|strasse|stra[ßs]e|weg|platz|gasse|" # German
r"street|road|lane|drive|way)" # English
)
addr_pattern = re.compile(
rf"\d{{1,4}}[\s,]+{street_keywords}\s[^<]{{3,60}}?\d{{4,5}}\s+[A-ZÀ-Ü][a-zà-ü]{{2,}}",
re.IGNORECASE,
)
for m in addr_pattern.finditer(plain):
_add_address(m.group(), url, "regex")
except Exception as e: # noqa: EXC003
logger.debug("Contact scrape failed for %s%s: %s", domain, path, e)
session.close()
# Save contacts (replace existing auto-scraped ones)
db.query(ProspectContact).filter(
ProspectContact.prospect_id == prospect.id,
ProspectContact.source_element.in_(["regex", "tel_href", "mailto_href", "schema_org", "address_tag"]),
).delete()
db.add_all(contacts)
# Mark first email and phone as primary
for c in contacts:
if c.contact_type == "email":
c.is_primary = True
break
for c in contacts:
if c.contact_type == "phone":
c.is_primary = True
break
prospect.last_contact_scrape_at = datetime.now(UTC)
db.flush()
return contacts
def _detect_cms(self, html: str) -> str | None:
for cms, patterns in CMS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return cms
return None
def _detect_js_framework(self, html: str) -> str | None:
for framework, patterns in JS_FRAMEWORK_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return framework
return None
def _detect_analytics(self, html: str) -> str | None:
found = []
for tool, patterns in ANALYTICS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
found.append(tool)
break
return ",".join(found) if found else None
def scrape_content(self, db: Session, prospect: Prospect) -> dict | None:
"""Scrape page content (headings, paragraphs, images, services) for POC builder.
Uses BeautifulSoup to extract structured content from the prospect's
website. Stores results as JSON in prospect.scraped_content_json.
"""
import json
from bs4 import BeautifulSoup
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
base_url = f"{scheme}://{domain}"
paths = ["", "/about", "/a-propos", "/services", "/nos-services", "/contact"]
session = requests.Session()
session.verify = False # noqa: SEC047 passive scan
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; OrionBot/1.0)"})
content = {
"meta_description": None,
"headings": [],
"paragraphs": [],
"services": [],
"images": [],
"social_links": {},
"business_hours": None,
"languages_detected": [],
}
seen_headings = set()
seen_paragraphs = set()
for path in paths:
try:
url = base_url + path
resp = session.get(url, timeout=config.http_timeout, allow_redirects=True)
if resp.status_code != 200:
continue
soup = BeautifulSoup(resp.text, "html.parser")
# Meta description (first one found)
if not content["meta_description"]:
meta = soup.find("meta", attrs={"name": "description"})
if meta and meta.get("content"):
content["meta_description"] = meta["content"].strip()
# Language detection
html_tag = soup.find("html")
if html_tag and html_tag.get("lang"):
lang = html_tag["lang"][:2].lower()
if lang not in content["languages_detected"]:
content["languages_detected"].append(lang)
# Headings (H1, H2)
for tag in soup.find_all(["h1", "h2"]):
text = tag.get_text(strip=True)
if text and len(text) > 3 and text not in seen_headings:
seen_headings.add(text)
content["headings"].append(text)
# Paragraphs (substantial ones, skip tiny/boilerplate)
for tag in soup.find_all("p"):
text = tag.get_text(strip=True)
if text and len(text) > 50 and text not in seen_paragraphs:
seen_paragraphs.add(text)
content["paragraphs"].append(text)
if len(content["paragraphs"]) >= 20:
break
# Images (hero/banner sized, skip tiny icons)
for img in soup.find_all("img"):
src = img.get("src") or img.get("data-src")
if not src:
continue
# Make absolute
if src.startswith("//"):
src = "https:" + src
elif src.startswith("/"):
src = base_url + src
elif not src.startswith("http"):
continue
# Skip tiny images, data URIs, tracking pixels
if "1x1" in src or "pixel" in src or src.startswith("data:"):
continue
width = img.get("width", "")
height = img.get("height", "")
if width and width.isdigit() and int(width) < 100:
continue
if height and height.isdigit() and int(height) < 100:
continue
if src not in content["images"]:
content["images"].append(src)
if len(content["images"]) >= 15:
break
# Social links
for a in soup.find_all("a", href=True):
href = a["href"]
for platform, pattern in [
("facebook", "facebook.com"),
("instagram", "instagram.com"),
("linkedin", "linkedin.com"),
("twitter", "twitter.com"),
("youtube", "youtube.com"),
("tiktok", "tiktok.com"),
]:
if pattern in href and platform not in content["social_links"]:
content["social_links"][platform] = href
# Service items (from list items near "service" headings)
for heading in soup.find_all(["h2", "h3"]):
heading_text = heading.get_text(strip=True).lower()
if any(kw in heading_text for kw in ["service", "prestation", "leistung", "angebot", "nos activit"]):
# Look for list items or cards after this heading
sibling = heading.find_next_sibling()
while sibling and sibling.name not in ["h1", "h2", "h3"]:
if sibling.name in ["ul", "ol"]:
for li in sibling.find_all("li"):
text = li.get_text(strip=True)
if text and len(text) > 3 and text not in content["services"]:
content["services"].append(text)
elif sibling.name == "div":
# Cards pattern: divs with h3/h4 + p
card_title = sibling.find(["h3", "h4", "h5"])
if card_title:
text = card_title.get_text(strip=True)
if text and text not in content["services"]:
content["services"].append(text)
sibling = sibling.find_next_sibling()
if len(content["services"]) >= 10:
break
except Exception as e: # noqa: EXC003
logger.debug("Content scrape failed for %s%s: %s", domain, path, e)
session.close()
# Store results
prospect.scraped_content_json = json.dumps(content, ensure_ascii=False)
prospect.last_content_scrape_at = datetime.now(UTC)
db.flush()
logger.info(
"Content scrape for %s: %d headings, %d paragraphs, %d images, %d services",
domain, len(content["headings"]), len(content["paragraphs"]),
len(content["images"]), len(content["services"]),
)
return content
enrichment_service = EnrichmentService()