Files
orion/app/modules/messaging/services/email_template_service.py
Samir Boulahtit ce822af883
Some checks failed
CI / ruff (push) Successful in 9s
CI / pytest (push) Failing after 47m32s
CI / validate (push) Successful in 23s
CI / dependency-scanning (push) Successful in 29s
CI / docs (push) Has been skipped
CI / deploy (push) Has been skipped
feat: production launch — email audit, team invites, security headers, router fixes
- Fix loyalty & monitoring router bugs (_get_router → named routers)
- Implement team invitation email with send_template + seed templates (en/fr/de)
- Add SecurityHeadersMiddleware (nosniff, HSTS, referrer-policy, permissions-policy)
- Build email audit admin page: service, schemas, API, page route, menu, i18n, HTML, JS
- Clean stale TODO in platform-menu-config.js
- Add 67 tests (unit + integration) covering all new functionality

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 18:24:30 +01:00

864 lines
28 KiB
Python

# app/modules/messaging/services/email_template_service.py
"""
Email Template Service
Handles business logic for email template management:
- Platform template CRUD operations
- Store template override management
- Template preview and testing
- Email log queries
This service layer separates business logic from API endpoints
to follow the project's layered architecture.
"""
import logging
from dataclasses import dataclass
from typing import Any
from jinja2 import BaseLoader, Environment, TemplateError
from sqlalchemy import func
from sqlalchemy.orm import Session
from app.exceptions.base import (
AuthorizationException,
ResourceNotFoundException,
ValidationException,
)
from app.modules.messaging.models import (
EmailCategory,
EmailLog,
EmailTemplate,
StoreEmailTemplate,
)
logger = logging.getLogger(__name__)
_jinja_env = Environment(loader=BaseLoader(), autoescape=True)
# Supported languages
SUPPORTED_LANGUAGES = ["en", "fr", "de", "lb"]
@dataclass
class TemplateData:
"""Template data container."""
code: str
language: str
name: str
description: str | None
category: str
subject: str
body_html: str
body_text: str | None
variables: list[str]
required_variables: list[str]
is_platform_only: bool
@dataclass
class StoreOverrideData:
"""Store override data container."""
code: str
language: str
subject: str
body_html: str
body_text: str | None
name: str | None
updated_at: str | None
class EmailTemplateService:
"""Service for managing email templates."""
def __init__(self, db: Session):
self.db = db
# =========================================================================
# ADMIN OPERATIONS
# =========================================================================
def list_platform_templates(self) -> list[dict[str, Any]]:
"""
List all platform email templates grouped by code.
Returns:
List of template summaries with language availability.
"""
templates = (
self.db.query(EmailTemplate)
.order_by(EmailTemplate.category, EmailTemplate.code)
.all()
)
# Group by code
grouped: dict[str, dict] = {}
for template in templates:
if template.code not in grouped:
grouped[template.code] = {
"code": template.code,
"name": template.name,
"category": template.category,
"description": template.description,
"is_platform_only": template.is_platform_only,
"languages": [],
"variables": [],
}
grouped[template.code]["languages"].append(template.language)
if template.variables and not grouped[template.code]["variables"]:
try:
import json
grouped[template.code]["variables"] = json.loads(template.variables)
except (json.JSONDecodeError, TypeError):
pass
return list(grouped.values())
def get_template_categories(self) -> list[str]:
"""Get list of all template categories."""
return [cat.value for cat in EmailCategory]
def get_platform_template(self, code: str) -> dict[str, Any]:
"""
Get a platform template with all language versions.
Args:
code: Template code
Returns:
Template details with all language versions
Raises:
NotFoundError: If template not found
"""
templates = (
self.db.query(EmailTemplate)
.filter(EmailTemplate.code == code)
.all()
)
if not templates:
raise ResourceNotFoundException(f"Template not found: {code}")
first = templates[0]
languages = {}
for t in templates:
languages[t.language] = {
"subject": t.subject,
"body_html": t.body_html,
"body_text": t.body_text,
}
return {
"code": code,
"name": first.name,
"description": first.description,
"category": first.category,
"is_platform_only": first.is_platform_only,
"variables": self._parse_variables(first.variables),
"required_variables": self._parse_required_variables(first.required_variables),
"languages": languages,
}
def get_platform_template_language(self, code: str, language: str) -> TemplateData:
"""
Get a specific language version of a platform template.
Args:
code: Template code
language: Language code
Returns:
Template data for the specific language
Raises:
NotFoundError: If template or language not found
"""
template = EmailTemplate.get_by_code_and_language(self.db, code, language)
if not template:
raise ResourceNotFoundException(f"Template not found: {code}/{language}")
return TemplateData(
code=template.code,
language=template.language,
name=template.name,
description=template.description,
category=template.category,
subject=template.subject,
body_html=template.body_html,
body_text=template.body_text,
variables=self._parse_variables(template.variables),
required_variables=self._parse_required_variables(template.required_variables),
is_platform_only=template.is_platform_only,
)
def update_platform_template(
self,
code: str,
language: str,
subject: str,
body_html: str,
body_text: str | None = None,
) -> None:
"""
Update a platform email template.
Args:
code: Template code
language: Language code
subject: New subject line
body_html: New HTML body
body_text: New plain text body (optional)
Raises:
NotFoundError: If template not found
ValidationError: If template syntax is invalid
"""
template = EmailTemplate.get_by_code_and_language(self.db, code, language)
if not template:
raise ResourceNotFoundException(f"Template not found: {code}/{language}")
# Validate Jinja2 syntax
self._validate_template_syntax(subject, body_html, body_text)
template.subject = subject
template.body_html = body_html
template.body_text = body_text
logger.info(f"Updated platform template: {code}/{language}")
def preview_template(
self,
code: str,
language: str,
variables: dict[str, Any],
) -> dict[str, str]:
"""
Preview a template with sample variables.
Args:
code: Template code
language: Language code
variables: Variables to render
Returns:
Rendered subject and body
Raises:
NotFoundError: If template not found
ValidationError: If rendering fails
"""
template = EmailTemplate.get_by_code_and_language(self.db, code, language)
if not template:
raise ResourceNotFoundException(f"Template not found: {code}/{language}")
try:
rendered_subject = _jinja_env.from_string(template.subject).render(variables)
rendered_html = _jinja_env.from_string(template.body_html).render(variables)
rendered_text = _jinja_env.from_string(template.body_text).render(variables) if template.body_text else None
except TemplateError as e:
raise ValidationException(f"Template rendering error: {str(e)}")
return {
"subject": rendered_subject,
"body_html": rendered_html,
"body_text": rendered_text,
}
def get_template_logs(
self,
code: str,
limit: int = 50,
offset: int = 0,
) -> tuple[list[dict], int]:
"""
Get email logs for a specific template.
Args:
code: Template code
limit: Max results
offset: Skip results
Returns:
Tuple of (logs list, total count)
"""
query = (
self.db.query(EmailLog)
.filter(EmailLog.template_code == code)
.order_by(EmailLog.created_at.desc())
)
total = query.count()
logs = query.offset(offset).limit(limit).all()
return (
[
{
"id": log.id,
"to_email": log.to_email,
"status": log.status,
"language": log.language,
"created_at": log.created_at.isoformat() if log.created_at else None,
"error_message": log.error_message,
}
for log in logs
],
total,
)
# =========================================================================
# STORE OPERATIONS
# =========================================================================
def list_overridable_templates(self, store_id: int) -> dict[str, Any]:
"""
List all templates that a store can customize.
Args:
store_id: Store ID
Returns:
Dict with templates list and supported languages
"""
# Get all overridable platform templates
platform_templates = EmailTemplate.get_overridable_templates(self.db)
# Get all store overrides
store_overrides = StoreEmailTemplate.get_all_overrides_for_store(
self.db, store_id
)
# Build override lookup
override_lookup = {}
for override in store_overrides:
key = (override.template_code, override.language)
override_lookup[key] = override
# Build response
templates = []
for template in platform_templates:
# Check which languages have overrides
override_languages = []
for lang in SUPPORTED_LANGUAGES:
if (template.code, lang) in override_lookup:
override_languages.append(lang)
templates.append({
"code": template.code,
"name": template.name,
"category": template.category,
"description": template.description,
"available_languages": self._get_template_languages(template.code),
"override_languages": override_languages,
"has_override": len(override_languages) > 0,
"variables": self._parse_required_variables(template.required_variables),
})
return {
"templates": templates,
"supported_languages": SUPPORTED_LANGUAGES,
}
def get_store_template(self, store_id: int, code: str) -> dict[str, Any]:
"""
Get a template with all language versions for a store.
Args:
store_id: Store ID
code: Template code
Returns:
Template details with store overrides status
Raises:
NotFoundError: If template not found
ForbiddenError: If template is platform-only
"""
# Get platform template
platform_template = (
self.db.query(EmailTemplate)
.filter(EmailTemplate.code == code)
.first()
)
if not platform_template:
raise ResourceNotFoundException(f"Template not found: {code}")
if platform_template.is_platform_only:
raise AuthorizationException("This is a platform-only template and cannot be customized")
# Get all language versions
platform_versions = (
self.db.query(EmailTemplate)
.filter(EmailTemplate.code == code)
.all()
)
# Get store overrides
store_overrides = (
self.db.query(StoreEmailTemplate)
.filter(
StoreEmailTemplate.store_id == store_id,
StoreEmailTemplate.template_code == code,
)
.all()
)
override_lookup = {v.language: v for v in store_overrides}
platform_lookup = {t.language: t for t in platform_versions}
# Build language versions
languages = {}
for lang in SUPPORTED_LANGUAGES:
platform_ver = platform_lookup.get(lang)
override_ver = override_lookup.get(lang)
languages[lang] = {
"has_platform_template": platform_ver is not None,
"has_store_override": override_ver is not None,
"platform": {
"subject": platform_ver.subject,
"body_html": platform_ver.body_html,
"body_text": platform_ver.body_text,
} if platform_ver else None,
"store_override": {
"subject": override_ver.subject,
"body_html": override_ver.body_html,
"body_text": override_ver.body_text,
"name": override_ver.name,
"updated_at": override_ver.updated_at.isoformat() if override_ver else None,
} if override_ver else None,
}
return {
"code": code,
"name": platform_template.name,
"category": platform_template.category,
"description": platform_template.description,
"variables": self._parse_required_variables(platform_template.required_variables),
"languages": languages,
}
def get_store_template_language(
self,
store_id: int,
code: str,
language: str,
) -> dict[str, Any]:
"""
Get a specific language version for a store (override or platform).
Args:
store_id: Store ID
code: Template code
language: Language code
Returns:
Template data with source indicator
Raises:
NotFoundError: If template not found
ForbiddenError: If template is platform-only
ValidationError: If language not supported
"""
if language not in SUPPORTED_LANGUAGES:
raise ValidationException(f"Unsupported language: {language}")
# Check if template is overridable
platform_template = (
self.db.query(EmailTemplate)
.filter(EmailTemplate.code == code)
.first()
)
if not platform_template:
raise ResourceNotFoundException(f"Template not found: {code}")
if platform_template.is_platform_only:
raise AuthorizationException("This is a platform-only template and cannot be customized")
# Check for store override
store_override = StoreEmailTemplate.get_override(
self.db, store_id, code, language
)
# Get platform version
platform_version = EmailTemplate.get_by_code_and_language(
self.db, code, language
)
if store_override:
return {
"code": code,
"language": language,
"source": "store_override",
"subject": store_override.subject,
"body_html": store_override.body_html,
"body_text": store_override.body_text,
"name": store_override.name,
"variables": self._parse_required_variables(platform_template.required_variables),
"platform_template": {
"subject": platform_version.subject,
"body_html": platform_version.body_html,
} if platform_version else None,
}
if platform_version:
return {
"code": code,
"language": language,
"source": "platform",
"subject": platform_version.subject,
"body_html": platform_version.body_html,
"body_text": platform_version.body_text,
"name": platform_version.name,
"variables": self._parse_required_variables(platform_template.required_variables),
"platform_template": None,
}
raise ResourceNotFoundException(f"No template found for language: {language}")
def create_or_update_store_override(
self,
store_id: int,
code: str,
language: str,
subject: str,
body_html: str,
body_text: str | None = None,
name: str | None = None,
) -> dict[str, Any]:
"""
Create or update a store template override.
Args:
store_id: Store ID
code: Template code
language: Language code
subject: Custom subject
body_html: Custom HTML body
body_text: Custom plain text body
name: Custom template name
Returns:
Result with is_new indicator
Raises:
NotFoundError: If template not found
ForbiddenError: If template is platform-only
ValidationError: If syntax invalid or language not supported
"""
if language not in SUPPORTED_LANGUAGES:
raise ValidationException(f"Unsupported language: {language}")
# Check if template exists and is overridable
platform_template = (
self.db.query(EmailTemplate)
.filter(EmailTemplate.code == code)
.first()
)
if not platform_template:
raise ResourceNotFoundException(f"Template not found: {code}")
if platform_template.is_platform_only:
raise AuthorizationException("This is a platform-only template and cannot be customized")
# Validate template syntax
self._validate_template_syntax(subject, body_html, body_text)
# Create or update
override = StoreEmailTemplate.create_or_update(
db=self.db,
store_id=store_id,
template_code=code,
language=language,
subject=subject,
body_html=body_html,
body_text=body_text,
name=name,
)
logger.info(f"Store {store_id} updated template override: {code}/{language}")
return {
"message": "Template override saved",
"code": code,
"language": language,
"is_new": override.created_at == override.updated_at,
}
def delete_store_override(
self,
store_id: int,
code: str,
language: str,
) -> None:
"""
Delete a store template override.
Args:
store_id: Store ID
code: Template code
language: Language code
Raises:
NotFoundError: If override not found
ValidationError: If language not supported
"""
if language not in SUPPORTED_LANGUAGES:
raise ValidationException(f"Unsupported language: {language}")
deleted = StoreEmailTemplate.delete_override(
self.db, store_id, code, language
)
if not deleted:
raise ResourceNotFoundException("No override found for this template and language")
logger.info(f"Store {store_id} deleted template override: {code}/{language}")
def preview_store_template(
self,
store_id: int,
code: str,
language: str,
variables: dict[str, Any],
) -> dict[str, Any]:
"""
Preview a store template (override or platform).
Args:
store_id: Store ID
code: Template code
language: Language code
variables: Variables to render
Returns:
Rendered template with source indicator
Raises:
NotFoundError: If template not found
ValidationError: If rendering fails
"""
# Get template content
store_override = StoreEmailTemplate.get_override(
self.db, store_id, code, language
)
platform_version = EmailTemplate.get_by_code_and_language(
self.db, code, language
)
if store_override:
subject = store_override.subject
body_html = store_override.body_html
body_text = store_override.body_text
source = "store_override"
elif platform_version:
subject = platform_version.subject
body_html = platform_version.body_html
body_text = platform_version.body_text
source = "platform"
else:
raise ResourceNotFoundException(f"No template found for language: {language}")
try:
rendered_subject = _jinja_env.from_string(subject).render(variables)
rendered_html = _jinja_env.from_string(body_html).render(variables)
rendered_text = _jinja_env.from_string(body_text).render(variables) if body_text else None
except TemplateError as e:
raise ValidationException(f"Template rendering error: {str(e)}")
return {
"source": source,
"language": language,
"subject": rendered_subject,
"body_html": rendered_html,
"body_text": rendered_text,
}
# =========================================================================
# EMAIL LOG OPERATIONS (Audit)
# =========================================================================
def get_email_logs(
self,
filters: dict[str, Any] | None = None,
skip: int = 0,
limit: int = 50,
) -> tuple[list[dict[str, Any]], int]:
"""
Get paginated email logs with optional filters.
Args:
filters: Optional dict with keys: search, status, template_code,
store_id, date_from, date_to
skip: Offset for pagination
limit: Max results per page
Returns:
Tuple of (log items list, total count)
"""
query = self.db.query(EmailLog).order_by(EmailLog.created_at.desc())
if filters:
if filters.get("search"):
search = f"%{filters['search']}%"
query = query.filter(EmailLog.recipient_email.ilike(search))
if filters.get("status"):
query = query.filter(EmailLog.status == filters["status"])
if filters.get("template_code"):
query = query.filter(EmailLog.template_code == filters["template_code"])
if filters.get("store_id"):
query = query.filter(EmailLog.store_id == int(filters["store_id"]))
if filters.get("date_from"):
query = query.filter(EmailLog.created_at >= filters["date_from"])
if filters.get("date_to"):
query = query.filter(EmailLog.created_at <= filters["date_to"])
total = query.count()
logs = query.offset(skip).limit(limit).all()
items = []
for log in logs:
items.append({
"id": log.id,
"recipient_email": log.recipient_email,
"recipient_name": log.recipient_name,
"subject": log.subject,
"status": log.status,
"template_code": log.template_code,
"provider": log.provider,
"store_id": log.store_id,
"related_type": log.related_type,
"related_id": log.related_id,
"created_at": log.created_at.isoformat() if log.created_at else None,
"sent_at": log.sent_at.isoformat() if log.sent_at else None,
"error_message": log.error_message,
})
return items, total
def get_email_log_detail(self, log_id: int) -> dict[str, Any]:
"""
Get full detail for a single email log including body content.
Args:
log_id: Email log ID
Returns:
Full log details including body_html/body_text
Raises:
ResourceNotFoundException: If log not found
"""
log = self.db.query(EmailLog).filter(EmailLog.id == log_id).first()
if not log:
raise ResourceNotFoundException("EmailLog", str(log_id))
return {
"id": log.id,
"recipient_email": log.recipient_email,
"recipient_name": log.recipient_name,
"subject": log.subject,
"status": log.status,
"template_code": log.template_code,
"provider": log.provider,
"store_id": log.store_id,
"user_id": log.user_id,
"related_type": log.related_type,
"related_id": log.related_id,
"from_email": log.from_email,
"from_name": log.from_name,
"reply_to": log.reply_to,
"body_html": log.body_html,
"body_text": log.body_text,
"error_message": log.error_message,
"retry_count": log.retry_count,
"provider_message_id": log.provider_message_id,
"created_at": log.created_at.isoformat() if log.created_at else None,
"sent_at": log.sent_at.isoformat() if log.sent_at else None,
"delivered_at": log.delivered_at.isoformat() if log.delivered_at else None,
"opened_at": log.opened_at.isoformat() if log.opened_at else None,
"clicked_at": log.clicked_at.isoformat() if log.clicked_at else None,
"extra_data": log.extra_data,
}
def get_email_log_stats(self) -> dict[str, Any]:
"""
Get aggregate email log statistics.
Returns:
Dict with by_status, by_template, and total count.
"""
# Count by status
status_rows = (
self.db.query(EmailLog.status, func.count(EmailLog.id))
.group_by(EmailLog.status)
.all()
)
by_status = {row[0]: row[1] for row in status_rows}
# Count by template_code
template_rows = (
self.db.query(EmailLog.template_code, func.count(EmailLog.id))
.filter(EmailLog.template_code.isnot(None))
.group_by(EmailLog.template_code)
.all()
)
by_template = {row[0]: row[1] for row in template_rows}
total = sum(by_status.values())
return {
"by_status": by_status,
"by_template": by_template,
"total": total,
}
# =========================================================================
# HELPER METHODS
# =========================================================================
def _validate_template_syntax(
self,
subject: str,
body_html: str,
body_text: str | None,
) -> None:
"""Validate Jinja2 template syntax."""
try:
_jinja_env.from_string(subject).render({})
_jinja_env.from_string(body_html).render({})
if body_text:
_jinja_env.from_string(body_text).render({})
except TemplateError as e:
raise ValidationException(f"Invalid template syntax: {str(e)}")
def _parse_variables(self, variables_json: str | None) -> list[str]:
"""Parse variables JSON string to list."""
if not variables_json:
return []
try:
import json
return json.loads(variables_json)
except (json.JSONDecodeError, TypeError):
return []
def _parse_required_variables(self, required_vars: str | None) -> list[str]:
"""Parse required variables comma-separated string to list."""
if not required_vars:
return []
return [v.strip() for v in required_vars.split(",") if v.strip()]
def _get_template_languages(self, code: str) -> list[str]:
"""Get list of languages available for a template."""
templates = (
self.db.query(EmailTemplate.language)
.filter(EmailTemplate.code == code)
.all()
)
return [t.language for t in templates]