Files
orion/app/modules/prospecting/services/enrichment_service.py
Samir Boulahtit d685341b04
Some checks failed
CI / ruff (push) Successful in 15s
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
refactor(tenancy): simplify team table + move actions to edit modal
Reverts the expandable sub-row design back to a clean one-row-per-member
table. All per-store management now happens inside the edit modal.

Table: simple 4-column layout (Member | Stores & Roles | Status | Actions)
with view + edit buttons. Store badges show orange for pending stores.

Edit modal enhanced with per-store cards showing:
- Store name, code, and status badge (Active/Pending)
- Role dropdown + Update button (for active stores)
- Resend invitation button (for pending stores)
- Remove from store button
- "Remove from all stores" link at bottom

Removed: expandedMembers, flattenedRows, toggleMemberExpand,
resendStoreInvitation, resendInvitation (member-level).
Added: resendForStore, removeFromStore (work inside edit modal).

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
2026-03-30 21:08:36 +02:00

406 lines
16 KiB
Python

# app/modules/prospecting/services/enrichment_service.py
"""
Enrichment service for prospect scanning pipeline.
Migrated from marketing-.lu-domains/app/services/enrichment_service.py.
Performs passive HTTP checks, technology detection, performance audits,
and contact scraping for digital prospects.
Uses `requests` (sync) to match Orion's tech stack.
"""
import logging
import re
import socket
import ssl
from datetime import UTC, datetime
import requests
from sqlalchemy.orm import Session
from app.modules.prospecting.config import config
from app.modules.prospecting.models import (
Prospect,
ProspectContact,
ProspectPerformanceProfile,
ProspectTechProfile,
)
logger = logging.getLogger(__name__)
# CMS detection patterns
CMS_PATTERNS = {
"wordpress": [r"wp-content", r"wp-includes", r"wordpress"],
"drupal": [r"drupal", r"sites/default", r"sites/all"],
"joomla": [r"/media/jui/", r"joomla", r"/components/com_"],
"shopify": [r"cdn\.shopify\.com", r"shopify"],
"wix": [r"wix\.com", r"wixstatic\.com", r"parastorage\.com"],
"squarespace": [r"squarespace\.com", r"sqsp\.com"],
"webflow": [r"webflow\.com", r"webflow\.io"],
"typo3": [r"typo3", r"/typo3conf/"],
"prestashop": [r"prestashop", r"/modules/ps_"],
"magento": [r"magento", r"mage/", r"/static/version"],
}
JS_FRAMEWORK_PATTERNS = {
"react": [r"react", r"__NEXT_DATA__", r"_next/"],
"vue": [r"vue\.js", r"vue\.min\.js", r"__vue__"],
"angular": [r"angular", r"ng-version"],
"jquery": [r"jquery"],
"alpine": [r"alpine\.js", r"alpinejs"],
}
ANALYTICS_PATTERNS = {
"google_analytics": [r"google-analytics\.com", r"gtag/js", r"ga\.js"],
"google_tag_manager": [r"googletagmanager\.com", r"gtm\.js"],
"matomo": [r"matomo", r"piwik"],
"facebook_pixel": [r"facebook\.net/en_US/fbevents"],
}
class EnrichmentService:
"""Service for prospect enrichment via passive scanning."""
def check_http(self, db: Session, prospect: Prospect) -> dict:
"""Check HTTP connectivity for a prospect's domain."""
result = {
"has_website": False,
"uses_https": False,
"http_status_code": None,
"redirect_url": None,
"error": None,
}
domain = prospect.domain_name
if not domain:
result["error"] = "No domain name"
return result
# Try HTTPS first, then HTTP
for scheme in ["https", "http"]:
try:
url = f"{scheme}://{domain}"
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
result["has_website"] = True
result["uses_https"] = scheme == "https"
result["http_status_code"] = response.status_code
if response.url != url:
result["redirect_url"] = str(response.url)
break
except requests.exceptions.Timeout:
result["error"] = f"Timeout on {scheme}"
except requests.exceptions.RequestException as e:
result["error"] = str(e)
if scheme == "https":
continue
break
# Update prospect
prospect.has_website = result["has_website"]
prospect.uses_https = result["uses_https"]
prospect.http_status_code = result["http_status_code"]
prospect.redirect_url = result["redirect_url"]
prospect.last_http_check_at = datetime.now(UTC)
if result["has_website"]:
prospect.status = "active"
db.flush()
return result
def scan_tech_stack(self, db: Session, prospect: Prospect) -> ProspectTechProfile | None:
"""Scan technology stack from prospect's website HTML."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
try:
response = requests.get(
url,
timeout=config.http_timeout,
allow_redirects=True,
verify=False, # noqa: SEC047 passive scan, not sending sensitive data
)
html = response.text.lower()
headers = dict(response.headers)
cms = self._detect_cms(html)
js_framework = self._detect_js_framework(html)
analytics = self._detect_analytics(html)
server = headers.get("Server", "").split("/")[0] if "Server" in headers else None
server_version = None
if server and "/" in headers.get("Server", ""):
server_version = headers["Server"].split("/", 1)[1].strip()
# SSL certificate check
has_valid_cert = None
cert_issuer = None
cert_expires_at = None
if prospect.uses_https:
try:
ctx = ssl.create_default_context()
with ctx.wrap_socket(
socket.create_connection((domain, 443), timeout=5),
server_hostname=domain,
) as sock:
cert = sock.getpeercert()
has_valid_cert = True
cert_issuer = dict(x[0] for x in cert.get("issuer", [()])).get("organizationName")
not_after = cert.get("notAfter")
if not_after:
cert_expires_at = datetime.strptime(not_after, "%b %d %H:%M:%S %Y %Z")
except Exception: # noqa: EXC003
has_valid_cert = False
# Upsert tech profile
profile = prospect.tech_profile
if not profile:
profile = ProspectTechProfile(prospect_id=prospect.id)
db.add(profile)
profile.cms = cms
profile.server = server
profile.server_version = server_version
profile.js_framework = js_framework
profile.analytics = analytics
profile.has_valid_cert = has_valid_cert
profile.cert_issuer = cert_issuer
profile.cert_expires_at = cert_expires_at
profile.scan_source = "basic_http"
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Tech scan failed for %s: %s", domain, e)
if prospect.tech_profile:
prospect.tech_profile.scan_error = str(e)
prospect.last_tech_scan_at = datetime.now(UTC)
db.flush()
return None
def scan_performance(self, db: Session, prospect: Prospect) -> ProspectPerformanceProfile | None:
"""Run PageSpeed Insights audit for a prospect's website."""
domain = prospect.domain_name
if not domain or not prospect.has_website:
return None
scheme = "https" if prospect.uses_https else "http"
url = f"{scheme}://{domain}"
api_url = "https://www.googleapis.com/pagespeedonline/v5/runPagespeed"
params = {
"url": url,
"strategy": "mobile",
"category": ["performance", "accessibility", "best-practices", "seo"],
}
if config.pagespeed_api_key:
params["key"] = config.pagespeed_api_key
try:
response = requests.get(api_url, params=params, timeout=60)
data = response.json()
lighthouse = data.get("lighthouseResult", {})
categories = lighthouse.get("categories", {})
audits = lighthouse.get("audits", {})
perf_score = int((categories.get("performance", {}).get("score") or 0) * 100)
accessibility = int((categories.get("accessibility", {}).get("score") or 0) * 100)
best_practices = int((categories.get("best-practices", {}).get("score") or 0) * 100)
seo = int((categories.get("seo", {}).get("score") or 0) * 100)
# Upsert performance profile
profile = prospect.performance_profile
if not profile:
profile = ProspectPerformanceProfile(prospect_id=prospect.id)
db.add(profile)
profile.performance_score = perf_score
profile.accessibility_score = accessibility
profile.best_practices_score = best_practices
profile.seo_score = seo
# Core Web Vitals
fcp = audits.get("first-contentful-paint", {}).get("numericValue")
profile.first_contentful_paint_ms = int(fcp) if fcp else None
lcp = audits.get("largest-contentful-paint", {}).get("numericValue")
profile.largest_contentful_paint_ms = int(lcp) if lcp else None
tbt = audits.get("total-blocking-time", {}).get("numericValue")
profile.total_blocking_time_ms = int(tbt) if tbt else None
cls_val = audits.get("cumulative-layout-shift", {}).get("numericValue")
profile.cumulative_layout_shift = cls_val
si = audits.get("speed-index", {}).get("numericValue")
profile.speed_index = int(si) if si else None
tti = audits.get("interactive", {}).get("numericValue")
profile.time_to_interactive_ms = int(tti) if tti else None
# Mobile-friendly check
viewport = audits.get("viewport", {}).get("score")
profile.viewport_configured = viewport == 1 if viewport is not None else None
profile.is_mobile_friendly = profile.viewport_configured
profile.scan_strategy = "mobile"
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return profile
except Exception as e: # noqa: EXC003
logger.error("Performance scan failed for %s: %s", domain, e)
prospect.last_perf_scan_at = datetime.now(UTC)
db.flush()
return None
def scrape_contacts(self, db: Session, prospect: Prospect) -> list[ProspectContact]:
"""Scrape email and phone contacts from prospect's website.
Uses a two-phase approach:
1. Structured extraction from <a href="tel:..."> and <a href="mailto:..."> (high confidence)
2. Regex fallback for emails and international phone numbers (stricter filtering)
"""
from urllib.parse import unquote
domain = prospect.domain_name
if not domain or not prospect.has_website:
return []
scheme = "https" if prospect.uses_https else "http"
base_url = f"{scheme}://{domain}"
paths = ["", "/contact", "/kontakt", "/impressum", "/about", "/mentions-legales"]
# Structured patterns (from <a href> tags)
tel_pattern = re.compile(r'href=["\']tel:([^"\'>\s]+)', re.IGNORECASE)
mailto_pattern = re.compile(r'href=["\']mailto:([^"\'>\s?]+)', re.IGNORECASE)
# Regex fallback patterns
email_regex = re.compile(r"[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}")
# International phone: requires + prefix to avoid matching random digit sequences
phone_regex = re.compile(
r"\+\d{1,3}[\s.-]?\(?\d{1,4}\)?[\s.-]?\d{2,4}[\s.-]?\d{2,4}(?:[\s.-]?\d{2,4})?"
)
false_positive_domains = {
"example.com", "email.com", "domain.com", "wordpress.org",
"w3.org", "schema.org", "sentry.io", "googleapis.com",
}
found_emails: set[str] = set()
found_phones: set[str] = set()
contacts: list[ProspectContact] = []
def _add_email(email: str, url: str, source: str) -> None:
email = unquote(email).strip().lower()
email_domain = email.split("@")[1] if "@" in email else ""
if email_domain in false_positive_domains or email in found_emails:
return
found_emails.add(email)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="email",
value=email,
source_url=url,
source_element=source,
))
def _add_phone(phone: str, url: str, source: str) -> None:
phone_clean = re.sub(r"[\s.()\-]", "", phone)
if len(phone_clean) < 10 or phone_clean in found_phones:
return
found_phones.add(phone_clean)
contacts.append(ProspectContact(
prospect_id=prospect.id,
contact_type="phone",
value=phone_clean,
source_url=url,
source_element=source,
))
session = requests.Session()
session.verify = False # noqa: SEC047 passive scan, not sending sensitive data
session.headers.update({"User-Agent": "Mozilla/5.0 (compatible; OrionBot/1.0)"})
for path in paths:
try:
url = base_url + path
response = session.get(url, timeout=config.http_timeout, allow_redirects=True)
if response.status_code != 200:
continue
html = response.text
# Phase 1: structured extraction from href attributes
for phone in tel_pattern.findall(html):
_add_phone(unquote(phone), url, "tel_href")
for email in mailto_pattern.findall(html):
_add_email(email, url, "mailto_href")
# Phase 2: regex fallback — strip SVG/script content first
text_html = re.sub(r"<(svg|script|style)[^>]*>.*?</\1>", "", html, flags=re.DOTALL | re.IGNORECASE)
for email in email_regex.findall(text_html):
_add_email(email, url, "regex")
for phone in phone_regex.findall(text_html):
_add_phone(phone, url, "regex")
except Exception as e: # noqa: EXC003
logger.debug("Contact scrape failed for %s%s: %s", domain, path, e)
session.close()
# Save contacts (replace existing auto-scraped ones)
db.query(ProspectContact).filter(
ProspectContact.prospect_id == prospect.id,
ProspectContact.source_element.in_(["regex", "tel_href", "mailto_href"]),
).delete()
db.add_all(contacts)
# Mark first email and phone as primary
for c in contacts:
if c.contact_type == "email":
c.is_primary = True
break
for c in contacts:
if c.contact_type == "phone":
c.is_primary = True
break
prospect.last_contact_scrape_at = datetime.now(UTC)
db.flush()
return contacts
def _detect_cms(self, html: str) -> str | None:
for cms, patterns in CMS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return cms
return None
def _detect_js_framework(self, html: str) -> str | None:
for framework, patterns in JS_FRAMEWORK_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
return framework
return None
def _detect_analytics(self, html: str) -> str | None:
found = []
for tool, patterns in ANALYTICS_PATTERNS.items():
for pattern in patterns:
if re.search(pattern, html):
found.append(tool)
break
return ",".join(found) if found else None
enrichment_service = EnrichmentService()