Add comprehensive email template management for both admin and vendors:
Admin Features:
- Email templates management page at /admin/email-templates
- Edit platform templates with language support (en, fr, de, lb)
- Preview templates with sample variables
- Send test emails
- View email logs per template
Vendor Features:
- Email templates customization page at /vendor/{code}/email-templates
- Override platform templates with vendor-specific versions
- Preview and test customized templates
- Revert to platform defaults
Technical Changes:
- Migration for vendor_email_templates table
- VendorEmailTemplate model with override management
- Enhanced EmailService with language resolution chain
(customer preferred -> vendor preferred -> platform default)
- Branding resolution (Wizamart default, removed for whitelabel)
- Platform-only template protection (billing templates)
- Admin and vendor API endpoints with full CRUD
- Updated seed script with billing and team templates
Files: 22 changed, ~3,900 lines added
🤖 Generated with [Claude Code](https://claude.com/claude-code)
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
833 lines
27 KiB
Python
833 lines
27 KiB
Python
# app/services/email_service.py
|
|
"""
|
|
Email service with multi-provider support.
|
|
|
|
Supports:
|
|
- SMTP (default)
|
|
- SendGrid
|
|
- Mailgun
|
|
- Amazon SES
|
|
|
|
Features:
|
|
- Multi-language templates from database
|
|
- Vendor template overrides
|
|
- Jinja2 template rendering
|
|
- Email logging and tracking
|
|
- Queue support via background tasks
|
|
- Branding based on vendor tier (whitelabel)
|
|
|
|
Language Resolution (priority order):
|
|
1. Explicit language parameter
|
|
2. Customer's preferred language (if customer context)
|
|
3. Vendor's storefront language
|
|
4. Platform default (en)
|
|
|
|
Template Resolution (priority order):
|
|
1. Vendor override (if vendor_id and template is not platform-only)
|
|
2. Platform template
|
|
3. English fallback (if requested language not found)
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import smtplib
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from email.mime.multipart import MIMEMultipart
|
|
from email.mime.text import MIMEText
|
|
from typing import Any
|
|
|
|
from jinja2 import Environment, BaseLoader
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.core.config import settings
|
|
from models.database.email import EmailLog, EmailStatus, EmailTemplate
|
|
from models.database.vendor_email_template import VendorEmailTemplate
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Platform branding constants
|
|
PLATFORM_NAME = "Wizamart"
|
|
PLATFORM_SUPPORT_EMAIL = "support@wizamart.com"
|
|
PLATFORM_DEFAULT_LANGUAGE = "en"
|
|
SUPPORTED_LANGUAGES = ["en", "fr", "de", "lb"]
|
|
|
|
|
|
@dataclass
|
|
class ResolvedTemplate:
|
|
"""Resolved template content after checking vendor overrides."""
|
|
|
|
subject: str
|
|
body_html: str
|
|
body_text: str | None
|
|
is_vendor_override: bool
|
|
template_id: int | None # Platform template ID (None if vendor override)
|
|
template_code: str
|
|
language: str
|
|
|
|
|
|
@dataclass
|
|
class BrandingContext:
|
|
"""Branding variables for email templates."""
|
|
|
|
platform_name: str
|
|
platform_logo_url: str | None
|
|
support_email: str
|
|
vendor_name: str | None
|
|
vendor_logo_url: str | None
|
|
is_whitelabel: bool
|
|
|
|
|
|
# =============================================================================
|
|
# EMAIL PROVIDER ABSTRACTION
|
|
# =============================================================================
|
|
|
|
|
|
class EmailProvider(ABC):
|
|
"""Abstract base class for email providers."""
|
|
|
|
@abstractmethod
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
"""
|
|
Send an email.
|
|
|
|
Returns:
|
|
tuple: (success, provider_message_id, error_message)
|
|
"""
|
|
pass
|
|
|
|
|
|
class SMTPProvider(EmailProvider):
|
|
"""SMTP email provider."""
|
|
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
try:
|
|
msg = MIMEMultipart("alternative")
|
|
msg["Subject"] = subject
|
|
msg["From"] = f"{from_name} <{from_email}>" if from_name else from_email
|
|
msg["To"] = f"{to_name} <{to_email}>" if to_name else to_email
|
|
|
|
if reply_to:
|
|
msg["Reply-To"] = reply_to
|
|
|
|
# Attach text and HTML parts
|
|
if body_text:
|
|
msg.attach(MIMEText(body_text, "plain", "utf-8"))
|
|
msg.attach(MIMEText(body_html, "html", "utf-8"))
|
|
|
|
# Connect and send
|
|
if settings.smtp_use_ssl:
|
|
server = smtplib.SMTP_SSL(settings.smtp_host, settings.smtp_port)
|
|
else:
|
|
server = smtplib.SMTP(settings.smtp_host, settings.smtp_port)
|
|
|
|
try:
|
|
if settings.smtp_use_tls and not settings.smtp_use_ssl:
|
|
server.starttls()
|
|
|
|
if settings.smtp_user and settings.smtp_password:
|
|
server.login(settings.smtp_user, settings.smtp_password)
|
|
|
|
server.sendmail(from_email, [to_email], msg.as_string())
|
|
return True, None, None
|
|
|
|
finally:
|
|
server.quit()
|
|
|
|
except Exception as e:
|
|
logger.error(f"SMTP send error: {e}")
|
|
return False, None, str(e)
|
|
|
|
|
|
class SendGridProvider(EmailProvider):
|
|
"""SendGrid email provider."""
|
|
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
try:
|
|
from sendgrid import SendGridAPIClient
|
|
from sendgrid.helpers.mail import Mail, Email, To, Content
|
|
|
|
message = Mail(
|
|
from_email=Email(from_email, from_name),
|
|
to_emails=To(to_email, to_name),
|
|
subject=subject,
|
|
)
|
|
|
|
message.add_content(Content("text/html", body_html))
|
|
if body_text:
|
|
message.add_content(Content("text/plain", body_text))
|
|
|
|
if reply_to:
|
|
message.reply_to = Email(reply_to)
|
|
|
|
sg = SendGridAPIClient(settings.sendgrid_api_key)
|
|
response = sg.send(message)
|
|
|
|
if response.status_code in (200, 201, 202):
|
|
message_id = response.headers.get("X-Message-Id")
|
|
return True, message_id, None
|
|
else:
|
|
return False, None, f"SendGrid error: {response.status_code}"
|
|
|
|
except ImportError:
|
|
return False, None, "SendGrid library not installed. Run: pip install sendgrid"
|
|
except Exception as e:
|
|
logger.error(f"SendGrid send error: {e}")
|
|
return False, None, str(e)
|
|
|
|
|
|
class MailgunProvider(EmailProvider):
|
|
"""Mailgun email provider."""
|
|
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
try:
|
|
import requests
|
|
|
|
from_str = f"{from_name} <{from_email}>" if from_name else from_email
|
|
to_str = f"{to_name} <{to_email}>" if to_name else to_email
|
|
|
|
data = {
|
|
"from": from_str,
|
|
"to": to_str,
|
|
"subject": subject,
|
|
"html": body_html,
|
|
}
|
|
|
|
if body_text:
|
|
data["text"] = body_text
|
|
if reply_to:
|
|
data["h:Reply-To"] = reply_to
|
|
|
|
response = requests.post(
|
|
f"https://api.mailgun.net/v3/{settings.mailgun_domain}/messages",
|
|
auth=("api", settings.mailgun_api_key),
|
|
data=data,
|
|
timeout=30,
|
|
)
|
|
|
|
if response.status_code == 200:
|
|
result = response.json()
|
|
return True, result.get("id"), None
|
|
else:
|
|
return False, None, f"Mailgun error: {response.status_code} - {response.text}"
|
|
|
|
except Exception as e:
|
|
logger.error(f"Mailgun send error: {e}")
|
|
return False, None, str(e)
|
|
|
|
|
|
class SESProvider(EmailProvider):
|
|
"""Amazon SES email provider."""
|
|
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
try:
|
|
import boto3
|
|
|
|
ses = boto3.client(
|
|
"ses",
|
|
region_name=settings.aws_region,
|
|
aws_access_key_id=settings.aws_access_key_id,
|
|
aws_secret_access_key=settings.aws_secret_access_key,
|
|
)
|
|
|
|
from_str = f"{from_name} <{from_email}>" if from_name else from_email
|
|
|
|
body = {"Html": {"Charset": "UTF-8", "Data": body_html}}
|
|
if body_text:
|
|
body["Text"] = {"Charset": "UTF-8", "Data": body_text}
|
|
|
|
kwargs = {
|
|
"Source": from_str,
|
|
"Destination": {"ToAddresses": [to_email]},
|
|
"Message": {
|
|
"Subject": {"Charset": "UTF-8", "Data": subject},
|
|
"Body": body,
|
|
},
|
|
}
|
|
|
|
if reply_to:
|
|
kwargs["ReplyToAddresses"] = [reply_to]
|
|
|
|
response = ses.send_email(**kwargs)
|
|
return True, response.get("MessageId"), None
|
|
|
|
except ImportError:
|
|
return False, None, "boto3 library not installed. Run: pip install boto3"
|
|
except Exception as e:
|
|
logger.error(f"SES send error: {e}")
|
|
return False, None, str(e)
|
|
|
|
|
|
class DebugProvider(EmailProvider):
|
|
"""Debug provider - logs emails instead of sending."""
|
|
|
|
def send(
|
|
self,
|
|
to_email: str,
|
|
to_name: str | None,
|
|
subject: str,
|
|
body_html: str,
|
|
body_text: str | None,
|
|
from_email: str,
|
|
from_name: str | None,
|
|
reply_to: str | None = None,
|
|
) -> tuple[bool, str | None, str | None]:
|
|
logger.info(
|
|
f"\n{'='*60}\n"
|
|
f"DEBUG EMAIL\n"
|
|
f"{'='*60}\n"
|
|
f"To: {to_name} <{to_email}>\n"
|
|
f"From: {from_name} <{from_email}>\n"
|
|
f"Reply-To: {reply_to}\n"
|
|
f"Subject: {subject}\n"
|
|
f"{'='*60}\n"
|
|
f"Body (text):\n{body_text or '(none)'}\n"
|
|
f"{'='*60}\n"
|
|
f"Body (html):\n{body_html[:500]}...\n"
|
|
f"{'='*60}\n"
|
|
)
|
|
return True, f"debug-{to_email}", None
|
|
|
|
|
|
# =============================================================================
|
|
# EMAIL SERVICE
|
|
# =============================================================================
|
|
|
|
|
|
def get_provider() -> EmailProvider:
|
|
"""Get the configured email provider."""
|
|
if settings.email_debug:
|
|
return DebugProvider()
|
|
|
|
provider_map = {
|
|
"smtp": SMTPProvider,
|
|
"sendgrid": SendGridProvider,
|
|
"mailgun": MailgunProvider,
|
|
"ses": SESProvider,
|
|
}
|
|
|
|
provider_class = provider_map.get(settings.email_provider.lower())
|
|
if not provider_class:
|
|
logger.warning(f"Unknown email provider: {settings.email_provider}, using SMTP")
|
|
return SMTPProvider()
|
|
|
|
return provider_class()
|
|
|
|
|
|
class EmailService:
|
|
"""
|
|
Email service for sending templated emails.
|
|
|
|
Usage:
|
|
email_service = EmailService(db)
|
|
|
|
# Send using database template with vendor override support
|
|
email_service.send_template(
|
|
template_code="signup_welcome",
|
|
to_email="user@example.com",
|
|
to_name="John Doe",
|
|
variables={"first_name": "John", "login_url": "https://..."},
|
|
vendor_id=1,
|
|
# Language is resolved automatically from vendor/customer settings
|
|
)
|
|
|
|
# Send raw email
|
|
email_service.send_raw(
|
|
to_email="user@example.com",
|
|
subject="Hello",
|
|
body_html="<h1>Hello</h1>",
|
|
)
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
self.db = db
|
|
self.provider = get_provider()
|
|
self.jinja_env = Environment(loader=BaseLoader())
|
|
# Cache vendor and feature data to avoid repeated queries
|
|
self._vendor_cache: dict[int, Any] = {}
|
|
self._feature_cache: dict[int, set[str]] = {}
|
|
|
|
def _get_vendor(self, vendor_id: int):
|
|
"""Get vendor with caching."""
|
|
if vendor_id not in self._vendor_cache:
|
|
from models.database.vendor import Vendor
|
|
|
|
self._vendor_cache[vendor_id] = (
|
|
self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
)
|
|
return self._vendor_cache[vendor_id]
|
|
|
|
def _has_feature(self, vendor_id: int, feature_code: str) -> bool:
|
|
"""Check if vendor has a specific feature enabled."""
|
|
if vendor_id not in self._feature_cache:
|
|
from app.core.feature_gate import get_vendor_features
|
|
|
|
try:
|
|
self._feature_cache[vendor_id] = get_vendor_features(self.db, vendor_id)
|
|
except Exception:
|
|
self._feature_cache[vendor_id] = set()
|
|
|
|
return feature_code in self._feature_cache[vendor_id]
|
|
|
|
def resolve_language(
|
|
self,
|
|
explicit_language: str | None = None,
|
|
vendor_id: int | None = None,
|
|
customer_id: int | None = None,
|
|
) -> str:
|
|
"""
|
|
Resolve the language for an email.
|
|
|
|
Priority order:
|
|
1. Explicit language parameter
|
|
2. Customer's preferred language (if customer_id provided)
|
|
3. Vendor's storefront language (if vendor_id provided)
|
|
4. Platform default (en)
|
|
|
|
Args:
|
|
explicit_language: Explicitly requested language
|
|
vendor_id: Vendor ID for storefront language lookup
|
|
customer_id: Customer ID for preferred language lookup
|
|
|
|
Returns:
|
|
Resolved language code (one of: en, fr, de, lb)
|
|
"""
|
|
# 1. Explicit language takes priority
|
|
if explicit_language and explicit_language in SUPPORTED_LANGUAGES:
|
|
return explicit_language
|
|
|
|
# 2. Customer's preferred language
|
|
if customer_id:
|
|
from models.database.customer import Customer
|
|
|
|
customer = (
|
|
self.db.query(Customer).filter(Customer.id == customer_id).first()
|
|
)
|
|
if customer and customer.preferred_language in SUPPORTED_LANGUAGES:
|
|
return customer.preferred_language
|
|
|
|
# 3. Vendor's storefront language
|
|
if vendor_id:
|
|
vendor = self._get_vendor(vendor_id)
|
|
if vendor and vendor.storefront_language in SUPPORTED_LANGUAGES:
|
|
return vendor.storefront_language
|
|
|
|
# 4. Platform default
|
|
return PLATFORM_DEFAULT_LANGUAGE
|
|
|
|
def get_branding(self, vendor_id: int | None = None) -> BrandingContext:
|
|
"""
|
|
Get branding context for email templates.
|
|
|
|
If vendor has white_label feature enabled (Enterprise tier),
|
|
platform branding is replaced with vendor branding.
|
|
|
|
Args:
|
|
vendor_id: Optional vendor ID
|
|
|
|
Returns:
|
|
BrandingContext with appropriate branding variables
|
|
"""
|
|
vendor = None
|
|
is_whitelabel = False
|
|
|
|
if vendor_id:
|
|
vendor = self._get_vendor(vendor_id)
|
|
is_whitelabel = self._has_feature(vendor_id, "white_label")
|
|
|
|
if is_whitelabel and vendor:
|
|
# Whitelabel: use vendor branding throughout
|
|
return BrandingContext(
|
|
platform_name=vendor.name,
|
|
platform_logo_url=vendor.logo_url,
|
|
support_email=vendor.support_email or PLATFORM_SUPPORT_EMAIL,
|
|
vendor_name=vendor.name,
|
|
vendor_logo_url=vendor.logo_url,
|
|
is_whitelabel=True,
|
|
)
|
|
else:
|
|
# Standard: Wizamart branding with vendor details
|
|
return BrandingContext(
|
|
platform_name=PLATFORM_NAME,
|
|
platform_logo_url=None, # Use default platform logo
|
|
support_email=PLATFORM_SUPPORT_EMAIL,
|
|
vendor_name=vendor.name if vendor else None,
|
|
vendor_logo_url=vendor.logo_url if vendor else None,
|
|
is_whitelabel=False,
|
|
)
|
|
|
|
def resolve_template(
|
|
self,
|
|
template_code: str,
|
|
language: str,
|
|
vendor_id: int | None = None,
|
|
) -> ResolvedTemplate | None:
|
|
"""
|
|
Resolve template content with vendor override support.
|
|
|
|
Resolution order:
|
|
1. Check for vendor override (if vendor_id and template is not platform-only)
|
|
2. Fall back to platform template
|
|
3. Fall back to English if language not found
|
|
|
|
Args:
|
|
template_code: Template code (e.g., "password_reset")
|
|
language: Language code
|
|
vendor_id: Optional vendor ID for override lookup
|
|
|
|
Returns:
|
|
ResolvedTemplate with content, or None if not found
|
|
"""
|
|
# First, get platform template to check if it's platform-only
|
|
platform_template = self.get_template(template_code, language)
|
|
|
|
if not platform_template:
|
|
logger.warning(f"Template not found: {template_code} ({language})")
|
|
return None
|
|
|
|
# Check for vendor override (if not platform-only)
|
|
if vendor_id and not platform_template.is_platform_only:
|
|
vendor_override = VendorEmailTemplate.get_override(
|
|
self.db, vendor_id, template_code, language
|
|
)
|
|
|
|
if vendor_override:
|
|
return ResolvedTemplate(
|
|
subject=vendor_override.subject,
|
|
body_html=vendor_override.body_html,
|
|
body_text=vendor_override.body_text,
|
|
is_vendor_override=True,
|
|
template_id=None,
|
|
template_code=template_code,
|
|
language=language,
|
|
)
|
|
|
|
# Use platform template
|
|
return ResolvedTemplate(
|
|
subject=platform_template.subject,
|
|
body_html=platform_template.body_html,
|
|
body_text=platform_template.body_text,
|
|
is_vendor_override=False,
|
|
template_id=platform_template.id,
|
|
template_code=template_code,
|
|
language=language,
|
|
)
|
|
|
|
def get_template(
|
|
self, template_code: str, language: str = "en"
|
|
) -> EmailTemplate | None:
|
|
"""Get platform email template from database with fallback to English."""
|
|
template = (
|
|
self.db.query(EmailTemplate)
|
|
.filter(
|
|
EmailTemplate.code == template_code,
|
|
EmailTemplate.language == language,
|
|
EmailTemplate.is_active == True, # noqa: E712
|
|
)
|
|
.first()
|
|
)
|
|
|
|
# Fallback to English if not found
|
|
if not template and language != "en":
|
|
template = (
|
|
self.db.query(EmailTemplate)
|
|
.filter(
|
|
EmailTemplate.code == template_code,
|
|
EmailTemplate.language == "en",
|
|
EmailTemplate.is_active == True, # noqa: E712
|
|
)
|
|
.first()
|
|
)
|
|
|
|
return template
|
|
|
|
def render_template(self, template_string: str, variables: dict[str, Any]) -> str:
|
|
"""Render a Jinja2 template string with variables."""
|
|
try:
|
|
template = self.jinja_env.from_string(template_string)
|
|
return template.render(**variables)
|
|
except Exception as e:
|
|
logger.error(f"Template rendering error: {e}")
|
|
return template_string
|
|
|
|
def send_template(
|
|
self,
|
|
template_code: str,
|
|
to_email: str,
|
|
to_name: str | None = None,
|
|
language: str | None = None,
|
|
variables: dict[str, Any] | None = None,
|
|
vendor_id: int | None = None,
|
|
customer_id: int | None = None,
|
|
user_id: int | None = None,
|
|
related_type: str | None = None,
|
|
related_id: int | None = None,
|
|
include_branding: bool = True,
|
|
) -> EmailLog:
|
|
"""
|
|
Send an email using a database template with vendor override support.
|
|
|
|
Args:
|
|
template_code: Template code (e.g., "signup_welcome")
|
|
to_email: Recipient email address
|
|
to_name: Recipient name (optional)
|
|
language: Language code (auto-resolved if None)
|
|
variables: Template variables dict
|
|
vendor_id: Vendor ID for override lookup and logging
|
|
customer_id: Customer ID for language resolution
|
|
user_id: Related user ID for logging
|
|
related_type: Related entity type (e.g., "order")
|
|
related_id: Related entity ID
|
|
include_branding: Whether to inject branding variables (default: True)
|
|
|
|
Returns:
|
|
EmailLog record
|
|
"""
|
|
variables = variables or {}
|
|
|
|
# Resolve language (uses customer -> vendor -> platform default order)
|
|
resolved_language = self.resolve_language(
|
|
explicit_language=language,
|
|
vendor_id=vendor_id,
|
|
customer_id=customer_id,
|
|
)
|
|
|
|
# Resolve template (checks vendor override, falls back to platform)
|
|
resolved = self.resolve_template(template_code, resolved_language, vendor_id)
|
|
|
|
if not resolved:
|
|
logger.error(f"Email template not found: {template_code} ({resolved_language})")
|
|
# Create failed log entry
|
|
log = EmailLog(
|
|
template_code=template_code,
|
|
recipient_email=to_email,
|
|
recipient_name=to_name,
|
|
subject=f"[Template not found: {template_code}]",
|
|
from_email=settings.email_from_address,
|
|
from_name=settings.email_from_name,
|
|
status=EmailStatus.FAILED.value,
|
|
error_message=f"Template not found: {template_code} ({resolved_language})",
|
|
provider=settings.email_provider,
|
|
vendor_id=vendor_id,
|
|
user_id=user_id,
|
|
related_type=related_type,
|
|
related_id=related_id,
|
|
)
|
|
self.db.add(log)
|
|
self.db.commit() # noqa: SVC-006 - Email logs are side effects, commit immediately
|
|
return log
|
|
|
|
# Inject branding variables if requested
|
|
if include_branding:
|
|
branding = self.get_branding(vendor_id)
|
|
variables = {
|
|
**variables,
|
|
"platform_name": branding.platform_name,
|
|
"platform_logo_url": branding.platform_logo_url,
|
|
"support_email": branding.support_email,
|
|
"vendor_name": branding.vendor_name,
|
|
"vendor_logo_url": branding.vendor_logo_url,
|
|
"is_whitelabel": branding.is_whitelabel,
|
|
}
|
|
|
|
# Render template
|
|
subject = self.render_template(resolved.subject, variables)
|
|
body_html = self.render_template(resolved.body_html, variables)
|
|
body_text = (
|
|
self.render_template(resolved.body_text, variables)
|
|
if resolved.body_text
|
|
else None
|
|
)
|
|
|
|
return self.send_raw(
|
|
to_email=to_email,
|
|
to_name=to_name,
|
|
subject=subject,
|
|
body_html=body_html,
|
|
body_text=body_text,
|
|
template_code=template_code,
|
|
template_id=resolved.template_id,
|
|
vendor_id=vendor_id,
|
|
user_id=user_id,
|
|
related_type=related_type,
|
|
related_id=related_id,
|
|
extra_data=json.dumps(variables) if variables else None,
|
|
)
|
|
|
|
def send_raw(
|
|
self,
|
|
to_email: str,
|
|
subject: str,
|
|
body_html: str,
|
|
to_name: str | None = None,
|
|
body_text: str | None = None,
|
|
from_email: str | None = None,
|
|
from_name: str | None = None,
|
|
reply_to: str | None = None,
|
|
template_code: str | None = None,
|
|
template_id: int | None = None,
|
|
vendor_id: int | None = None,
|
|
user_id: int | None = None,
|
|
related_type: str | None = None,
|
|
related_id: int | None = None,
|
|
extra_data: str | None = None,
|
|
) -> EmailLog:
|
|
"""
|
|
Send a raw email without using a template.
|
|
|
|
Returns:
|
|
EmailLog record
|
|
"""
|
|
from_email = from_email or settings.email_from_address
|
|
from_name = from_name or settings.email_from_name
|
|
reply_to = reply_to or settings.email_reply_to or None
|
|
|
|
# Create log entry
|
|
log = EmailLog(
|
|
template_code=template_code,
|
|
template_id=template_id,
|
|
recipient_email=to_email,
|
|
recipient_name=to_name,
|
|
subject=subject,
|
|
body_html=body_html,
|
|
body_text=body_text,
|
|
from_email=from_email,
|
|
from_name=from_name,
|
|
reply_to=reply_to,
|
|
status=EmailStatus.PENDING.value,
|
|
provider=settings.email_provider,
|
|
vendor_id=vendor_id,
|
|
user_id=user_id,
|
|
related_type=related_type,
|
|
related_id=related_id,
|
|
extra_data=extra_data,
|
|
)
|
|
self.db.add(log)
|
|
self.db.flush()
|
|
|
|
# Check if emails are disabled
|
|
if not settings.email_enabled:
|
|
log.status = EmailStatus.FAILED.value
|
|
log.error_message = "Email sending is disabled"
|
|
self.db.commit() # noqa: SVC-006 - Email logs are side effects, commit immediately
|
|
logger.info(f"Email sending disabled, skipping: {to_email}")
|
|
return log
|
|
|
|
# Send email
|
|
success, message_id, error = self.provider.send(
|
|
to_email=to_email,
|
|
to_name=to_name,
|
|
subject=subject,
|
|
body_html=body_html,
|
|
body_text=body_text,
|
|
from_email=from_email,
|
|
from_name=from_name,
|
|
reply_to=reply_to,
|
|
)
|
|
|
|
if success:
|
|
log.mark_sent(message_id)
|
|
logger.info(f"Email sent to {to_email}: {subject}")
|
|
else:
|
|
log.mark_failed(error or "Unknown error")
|
|
logger.error(f"Email failed to {to_email}: {error}")
|
|
|
|
self.db.commit() # noqa: SVC-006 - Email logs are side effects, commit immediately
|
|
return log
|
|
|
|
|
|
# =============================================================================
|
|
# CONVENIENCE FUNCTIONS
|
|
# =============================================================================
|
|
|
|
|
|
def send_email(
|
|
db: Session,
|
|
template_code: str,
|
|
to_email: str,
|
|
to_name: str | None = None,
|
|
language: str | None = None,
|
|
variables: dict[str, Any] | None = None,
|
|
vendor_id: int | None = None,
|
|
customer_id: int | None = None,
|
|
**kwargs,
|
|
) -> EmailLog:
|
|
"""
|
|
Convenience function to send a templated email.
|
|
|
|
Args:
|
|
db: Database session
|
|
template_code: Template code (e.g., "password_reset")
|
|
to_email: Recipient email address
|
|
to_name: Recipient name (optional)
|
|
language: Language code (auto-resolved from customer/vendor if None)
|
|
variables: Template variables dict
|
|
vendor_id: Vendor ID for override lookup and branding
|
|
customer_id: Customer ID for language resolution
|
|
**kwargs: Additional arguments passed to send_template
|
|
|
|
Returns:
|
|
EmailLog record
|
|
"""
|
|
service = EmailService(db)
|
|
return service.send_template(
|
|
template_code=template_code,
|
|
to_email=to_email,
|
|
to_name=to_name,
|
|
language=language,
|
|
variables=variables,
|
|
vendor_id=vendor_id,
|
|
customer_id=customer_id,
|
|
**kwargs,
|
|
)
|