refactor: migrate templates and static files to self-contained modules

Templates Migration:
- Migrate admin templates to modules (tenancy, billing, monitoring, marketplace, etc.)
- Migrate vendor templates to modules (tenancy, billing, orders, messaging, etc.)
- Migrate storefront templates to modules (catalog, customers, orders, cart, checkout, cms)
- Migrate public templates to modules (billing, marketplace, cms)
- Keep shared templates in app/templates/ (base.html, errors/, partials/, macros/)
- Migrate letzshop partials to marketplace module

Static Files Migration:
- Migrate admin JS to modules: tenancy (23 files), core (5 files), monitoring (1 file)
- Migrate vendor JS to modules: tenancy (4 files), core (2 files)
- Migrate shared JS: vendor-selector.js to core, media-picker.js to cms
- Migrate storefront JS: storefront-layout.js to core
- Keep framework JS in static/ (api-client, utils, money, icons, log-config, lib/)
- Update all template references to use module_static paths

Naming Consistency:
- Rename static/platform/ to static/public/
- Rename app/templates/platform/ to app/templates/public/
- Update all extends and static references

Documentation:
- Update module-system.md with shared templates documentation
- Update frontend-structure.md with new module JS organization

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-02-01 14:34:16 +01:00
parent 843703258f
commit 4e28d91a78
542 changed files with 11603 additions and 9037 deletions

View File

@@ -9,8 +9,26 @@ from app.modules.cms.services.content_page_service import (
ContentPageService,
content_page_service,
)
from app.modules.cms.services.media_service import (
MediaService,
media_service,
)
from app.modules.cms.services.vendor_theme_service import (
VendorThemeService,
vendor_theme_service,
)
from app.modules.cms.services.vendor_email_settings_service import (
VendorEmailSettingsService,
get_vendor_email_settings_service,
)
__all__ = [
"ContentPageService",
"content_page_service",
"MediaService",
"media_service",
"VendorThemeService",
"vendor_theme_service",
"VendorEmailSettingsService",
"get_vendor_email_settings_service",
]

View File

@@ -0,0 +1,552 @@
# app/modules/cms/services/media_service.py
"""
Media service for vendor media library management.
This module provides:
- File upload and storage
- Thumbnail generation for images
- Media metadata management
- Media usage tracking
"""
import logging
import mimetypes
import os
import shutil
import uuid
from datetime import UTC, datetime
from pathlib import Path
from sqlalchemy import or_
from sqlalchemy.orm import Session
from app.modules.cms.exceptions import (
MediaNotFoundException,
MediaUploadException,
MediaValidationException,
UnsupportedMediaTypeException,
MediaFileTooLargeException,
)
from models.database.media import MediaFile
from app.modules.catalog.models import ProductMedia
logger = logging.getLogger(__name__)
# Base upload directory
UPLOAD_DIR = Path("uploads")
VENDOR_UPLOAD_DIR = UPLOAD_DIR / "vendors"
# Allowed file types and their categories
ALLOWED_EXTENSIONS = {
# Images
"jpg": "image",
"jpeg": "image",
"png": "image",
"gif": "image",
"webp": "image",
"svg": "image",
# Videos
"mp4": "video",
"webm": "video",
"mov": "video",
# Documents
"pdf": "document",
"doc": "document",
"docx": "document",
"xls": "document",
"xlsx": "document",
"csv": "document",
"txt": "document",
}
# Maximum file sizes (in bytes)
MAX_FILE_SIZES = {
"image": 10 * 1024 * 1024, # 10 MB
"video": 100 * 1024 * 1024, # 100 MB
"document": 20 * 1024 * 1024, # 20 MB
}
# Thumbnail settings
THUMBNAIL_SIZE = (200, 200)
class MediaService:
"""Service for vendor media library operations."""
def _get_vendor_upload_path(self, vendor_id: int, folder: str = "general") -> Path:
"""Get the upload directory path for a vendor."""
return VENDOR_UPLOAD_DIR / str(vendor_id) / folder
def _ensure_upload_dir(self, path: Path) -> None:
"""Ensure upload directory exists."""
path.mkdir(parents=True, exist_ok=True)
def _get_file_extension(self, filename: str) -> str:
"""Extract file extension from filename."""
return filename.rsplit(".", 1)[-1].lower() if "." in filename else ""
def _get_media_type(self, extension: str) -> str | None:
"""Get media type from file extension."""
return ALLOWED_EXTENSIONS.get(extension)
def _generate_unique_filename(self, original_filename: str) -> str:
"""Generate a unique filename using UUID."""
ext = self._get_file_extension(original_filename)
return f"{uuid.uuid4().hex}.{ext}" if ext else uuid.uuid4().hex
def _validate_file(
self, filename: str, file_size: int
) -> tuple[str, str]:
"""
Validate uploaded file.
Returns:
Tuple of (extension, media_type)
Raises:
MediaValidationException: If file is invalid
UnsupportedMediaTypeException: If file type is not supported
MediaFileTooLargeException: If file exceeds size limit
"""
ext = self._get_file_extension(filename)
if not ext:
raise MediaValidationException("File must have an extension", field="file")
media_type = self._get_media_type(ext)
if not media_type:
raise UnsupportedMediaTypeException(
ext, allowed_types=list(ALLOWED_EXTENSIONS.keys())
)
max_size = MAX_FILE_SIZES.get(media_type, 10 * 1024 * 1024)
if file_size > max_size:
raise MediaFileTooLargeException(file_size, max_size, media_type)
return ext, media_type
def _get_image_dimensions(self, file_path: Path) -> tuple[int, int] | None:
"""Get image dimensions if PIL is available."""
try:
from PIL import Image
with Image.open(file_path) as img:
return img.size
except ImportError:
logger.debug("PIL not available, skipping image dimension detection")
return None
except Exception as e:
logger.warning(f"Could not get image dimensions: {e}")
return None
def _generate_thumbnail(
self, source_path: Path, vendor_id: int
) -> str | None:
"""Generate thumbnail for image file."""
try:
from PIL import Image
# Create thumbnails directory
thumb_dir = self._get_vendor_upload_path(vendor_id, "thumbnails")
self._ensure_upload_dir(thumb_dir)
# Generate thumbnail filename
thumb_filename = f"thumb_{source_path.name}"
thumb_path = thumb_dir / thumb_filename
# Create thumbnail
with Image.open(source_path) as img:
img.thumbnail(THUMBNAIL_SIZE)
# Convert to RGB if needed (for PNG with transparency)
if img.mode in ("RGBA", "P"):
img = img.convert("RGB")
img.save(thumb_path, "JPEG", quality=85)
# Return relative path
return str(thumb_path.relative_to(UPLOAD_DIR))
except ImportError:
logger.debug("PIL not available, skipping thumbnail generation")
return None
except Exception as e:
logger.warning(f"Could not generate thumbnail: {e}")
return None
async def upload_file(
self,
db: Session,
vendor_id: int,
file_content: bytes,
filename: str,
folder: str = "general",
) -> MediaFile:
"""
Upload a file to the media library.
Args:
db: Database session
vendor_id: Vendor ID
file_content: File content as bytes
filename: Original filename
folder: Folder to store in (products, general, etc.)
Returns:
Created MediaFile record
"""
# Validate file
file_size = len(file_content)
ext, media_type = self._validate_file(filename, file_size)
# Generate unique filename
unique_filename = self._generate_unique_filename(filename)
# Get upload path
upload_path = self._get_vendor_upload_path(vendor_id, folder)
self._ensure_upload_dir(upload_path)
# Save file
file_path = upload_path / unique_filename
file_path.write_bytes(file_content)
# Get relative path for storage
relative_path = str(file_path.relative_to(UPLOAD_DIR))
# Get MIME type
mime_type, _ = mimetypes.guess_type(filename)
# Get image dimensions and generate thumbnail
width, height = None, None
thumbnail_path = None
if media_type == "image":
dimensions = self._get_image_dimensions(file_path)
if dimensions:
width, height = dimensions
thumbnail_path = self._generate_thumbnail(file_path, vendor_id)
# Create database record
media_file = MediaFile(
vendor_id=vendor_id,
filename=unique_filename,
original_filename=filename,
file_path=relative_path,
media_type=media_type,
mime_type=mime_type,
file_size=file_size,
width=width,
height=height,
thumbnail_path=thumbnail_path,
folder=folder,
)
db.add(media_file)
db.flush()
db.refresh(media_file)
logger.info(
f"Uploaded media file {media_file.id} for vendor {vendor_id}: {filename}"
)
return media_file
def get_media(
self, db: Session, vendor_id: int, media_id: int
) -> MediaFile:
"""
Get a media file by ID.
Raises:
MediaNotFoundException: If media not found or doesn't belong to vendor
"""
media = (
db.query(MediaFile)
.filter(
MediaFile.id == media_id,
MediaFile.vendor_id == vendor_id,
)
.first()
)
if not media:
raise MediaNotFoundException(media_id)
return media
def get_media_library(
self,
db: Session,
vendor_id: int,
skip: int = 0,
limit: int = 100,
media_type: str | None = None,
folder: str | None = None,
search: str | None = None,
) -> tuple[list[MediaFile], int]:
"""
Get vendor media library with filtering.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
media_type: Filter by media type
folder: Filter by folder
search: Search in filename
Returns:
Tuple of (media_files, total_count)
"""
query = db.query(MediaFile).filter(MediaFile.vendor_id == vendor_id)
if media_type:
query = query.filter(MediaFile.media_type == media_type)
if folder:
query = query.filter(MediaFile.folder == folder)
if search:
search_pattern = f"%{search}%"
query = query.filter(
or_(
MediaFile.filename.ilike(search_pattern),
MediaFile.original_filename.ilike(search_pattern),
MediaFile.alt_text.ilike(search_pattern),
)
)
# Order by newest first
query = query.order_by(MediaFile.created_at.desc())
total = query.count()
media_files = query.offset(skip).limit(limit).all()
return media_files, total
def update_media_metadata(
self,
db: Session,
vendor_id: int,
media_id: int,
filename: str | None = None,
alt_text: str | None = None,
description: str | None = None,
folder: str | None = None,
metadata: dict | None = None,
) -> MediaFile:
"""
Update media file metadata.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
filename: New display filename
alt_text: Alt text for images
description: File description
folder: Move to different folder
metadata: Additional metadata
Returns:
Updated MediaFile
"""
media = self.get_media(db, vendor_id, media_id)
if filename is not None:
media.original_filename = filename
if alt_text is not None:
media.alt_text = alt_text
if description is not None:
media.description = description
if folder is not None and folder != media.folder:
# Move file to new folder
old_path = UPLOAD_DIR / media.file_path
new_dir = self._get_vendor_upload_path(vendor_id, folder)
self._ensure_upload_dir(new_dir)
new_path = new_dir / media.filename
if old_path.exists():
shutil.move(str(old_path), str(new_path))
media.file_path = str(new_path.relative_to(UPLOAD_DIR))
media.folder = folder
if metadata is not None:
media.extra_metadata = metadata
media.updated_at = datetime.now(UTC)
db.flush()
logger.info(f"Updated media metadata for {media_id}")
return media
def delete_media(
self, db: Session, vendor_id: int, media_id: int
) -> bool:
"""
Delete a media file.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
Returns:
True if deleted successfully
"""
media = self.get_media(db, vendor_id, media_id)
# Delete physical files
file_path = UPLOAD_DIR / media.file_path
if file_path.exists():
file_path.unlink()
if media.thumbnail_path:
thumb_path = UPLOAD_DIR / media.thumbnail_path
if thumb_path.exists():
thumb_path.unlink()
# Delete database record
db.delete(media)
logger.info(f"Deleted media file {media_id} for vendor {vendor_id}")
return True
def get_media_usage(
self, db: Session, vendor_id: int, media_id: int
) -> dict:
"""
Get where a media file is being used.
Returns:
Dict with products and other usage information
"""
media = self.get_media(db, vendor_id, media_id)
# Get product associations
product_usage = []
for assoc in media.product_associations:
product = assoc.product
if product:
product_usage.append({
"product_id": product.id,
"product_name": product.get_title() or f"Product {product.id}",
"usage_type": assoc.usage_type,
})
return {
"media_id": media_id,
"products": product_usage,
"other_usage": [],
"total_usage_count": len(product_usage),
}
def attach_to_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str = "gallery",
display_order: int = 0,
) -> ProductMedia:
"""
Attach a media file to a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: How the media is used (main_image, gallery, etc.)
display_order: Order for galleries
Returns:
Created ProductMedia association
"""
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
# Check if already attached with same usage type
existing = (
db.query(ProductMedia)
.filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
ProductMedia.usage_type == usage_type,
)
.first()
)
if existing:
existing.display_order = display_order
db.flush()
return existing
# Create association
product_media = ProductMedia(
product_id=product_id,
media_id=media_id,
usage_type=usage_type,
display_order=display_order,
)
db.add(product_media)
# Update usage count
media.usage_count = (media.usage_count or 0) + 1
db.flush()
return product_media
def detach_from_product(
self,
db: Session,
vendor_id: int,
media_id: int,
product_id: int,
usage_type: str | None = None,
) -> bool:
"""
Detach a media file from a product.
Args:
db: Database session
vendor_id: Vendor ID
media_id: Media file ID
product_id: Product ID
usage_type: Specific usage type to remove (None = all)
Returns:
True if detached
"""
# Verify media belongs to vendor
media = self.get_media(db, vendor_id, media_id)
query = db.query(ProductMedia).filter(
ProductMedia.product_id == product_id,
ProductMedia.media_id == media_id,
)
if usage_type:
query = query.filter(ProductMedia.usage_type == usage_type)
deleted_count = query.delete()
# Update usage count
if deleted_count > 0:
media.usage_count = max(0, (media.usage_count or 0) - deleted_count)
db.flush()
return deleted_count > 0
# Create service instance
media_service = MediaService()

View File

@@ -0,0 +1,483 @@
# app/modules/cms/services/vendor_email_settings_service.py
"""
Vendor Email Settings Service.
Handles CRUD operations for vendor email configuration:
- SMTP settings
- Advanced providers (SendGrid, Mailgun, SES) - tier-gated
- Sender identity (from_email, from_name, reply_to)
- Signature/footer customization
- Configuration verification via test email
"""
import logging
import smtplib
from datetime import UTC, datetime
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from sqlalchemy.orm import Session
from app.exceptions import (
AuthorizationException,
ResourceNotFoundException,
ValidationException,
ExternalServiceException,
)
from models.database import (
Vendor,
VendorEmailSettings,
EmailProvider,
PREMIUM_EMAIL_PROVIDERS,
VendorSubscription,
TierCode,
)
logger = logging.getLogger(__name__)
# Tiers that allow premium email providers
PREMIUM_TIERS = {TierCode.BUSINESS, TierCode.ENTERPRISE}
class VendorEmailSettingsService:
"""Service for managing vendor email settings."""
def __init__(self, db: Session):
self.db = db
# =========================================================================
# READ OPERATIONS
# =========================================================================
def get_settings(self, vendor_id: int) -> VendorEmailSettings | None:
"""Get email settings for a vendor."""
return (
self.db.query(VendorEmailSettings)
.filter(VendorEmailSettings.vendor_id == vendor_id)
.first()
)
def get_settings_or_404(self, vendor_id: int) -> VendorEmailSettings:
"""Get email settings or raise 404."""
settings = self.get_settings(vendor_id)
if not settings:
raise ResourceNotFoundException(
resource_type="vendor_email_settings",
identifier=str(vendor_id),
)
return settings
def is_configured(self, vendor_id: int) -> bool:
"""Check if vendor has configured email settings."""
settings = self.get_settings(vendor_id)
return settings is not None and settings.is_configured
def get_status(self, vendor_id: int) -> dict:
"""
Get email configuration status for a vendor.
Returns:
dict with is_configured, is_verified, provider, etc.
"""
settings = self.get_settings(vendor_id)
if not settings:
return {
"is_configured": False,
"is_verified": False,
"provider": None,
"from_email": None,
"from_name": None,
"message": "Email settings not configured. Configure SMTP to send emails.",
}
return {
"is_configured": settings.is_configured,
"is_verified": settings.is_verified,
"provider": settings.provider,
"from_email": settings.from_email,
"from_name": settings.from_name,
"last_verified_at": settings.last_verified_at.isoformat() if settings.last_verified_at else None,
"verification_error": settings.verification_error,
"message": self._get_status_message(settings),
}
def _get_status_message(self, settings: VendorEmailSettings) -> str:
"""Generate a human-readable status message."""
if not settings.is_configured:
return "Complete your email configuration to send emails."
if not settings.is_verified:
return "Email configured but not verified. Send a test email to verify."
return "Email settings configured and verified."
# =========================================================================
# WRITE OPERATIONS
# =========================================================================
def create_or_update(
self,
vendor_id: int,
data: dict,
current_tier: TierCode | None = None,
) -> VendorEmailSettings:
"""
Create or update vendor email settings.
Args:
vendor_id: Vendor ID
data: Settings data (from_email, from_name, smtp_*, etc.)
current_tier: Vendor's current subscription tier (for premium provider validation)
Returns:
Updated VendorEmailSettings
Raises:
AuthorizationException: If trying to use premium provider without required tier
"""
# Validate premium provider access
provider = data.get("provider", "smtp")
if provider in [p.value for p in PREMIUM_EMAIL_PROVIDERS]:
if current_tier not in PREMIUM_TIERS:
raise AuthorizationException(
message=f"Provider '{provider}' requires Business or Enterprise tier. "
"Upgrade your plan to use advanced email providers.",
details={"required_permission": "business_tier"},
)
settings = self.get_settings(vendor_id)
if not settings:
settings = VendorEmailSettings(vendor_id=vendor_id)
self.db.add(settings)
# Update fields
for field in [
"from_email",
"from_name",
"reply_to_email",
"signature_text",
"signature_html",
"provider",
# SMTP
"smtp_host",
"smtp_port",
"smtp_username",
"smtp_password",
"smtp_use_tls",
"smtp_use_ssl",
# SendGrid
"sendgrid_api_key",
# Mailgun
"mailgun_api_key",
"mailgun_domain",
# SES
"ses_access_key_id",
"ses_secret_access_key",
"ses_region",
]:
if field in data and data[field] is not None:
# Don't overwrite passwords/keys with empty strings
if field.endswith(("_password", "_key", "_access_key")) and data[field] == "":
continue
setattr(settings, field, data[field])
# Update configuration status
settings.update_configuration_status()
# Reset verification if provider/credentials changed
if any(
f in data
for f in ["provider", "smtp_host", "smtp_password", "sendgrid_api_key", "mailgun_api_key", "ses_access_key_id"]
):
settings.is_verified = False
settings.verification_error = None
self.db.flush()
logger.info(f"Updated email settings for vendor {vendor_id}: provider={settings.provider}")
return settings
def delete(self, vendor_id: int) -> None:
"""
Delete email settings for a vendor.
Raises:
ResourceNotFoundException: If settings not found
"""
settings = self.get_settings(vendor_id)
if not settings:
raise ResourceNotFoundException(
resource_type="vendor_email_settings",
identifier=str(vendor_id),
)
self.db.delete(settings)
self.db.flush()
logger.info(f"Deleted email settings for vendor {vendor_id}")
# =========================================================================
# VERIFICATION
# =========================================================================
def verify_settings(self, vendor_id: int, test_email: str) -> dict:
"""
Verify email settings by sending a test email.
Args:
vendor_id: Vendor ID
test_email: Email address to send test email to
Returns:
dict with success status and message
Raises:
ResourceNotFoundException: If settings not found
ValidationException: If settings incomplete
"""
settings = self.get_settings_or_404(vendor_id)
if not settings.is_fully_configured():
raise ValidationException(
message="Email settings incomplete. Configure all required fields first.",
field="settings",
)
try:
# Send test email based on provider
if settings.provider == EmailProvider.SMTP.value:
self._send_smtp_test(settings, test_email)
elif settings.provider == EmailProvider.SENDGRID.value:
self._send_sendgrid_test(settings, test_email)
elif settings.provider == EmailProvider.MAILGUN.value:
self._send_mailgun_test(settings, test_email)
elif settings.provider == EmailProvider.SES.value:
self._send_ses_test(settings, test_email)
else:
raise ValidationException(
message=f"Unknown provider: {settings.provider}",
field="provider",
)
# Mark as verified
settings.mark_verified()
self.db.flush()
logger.info(f"Email settings verified for vendor {vendor_id}")
return {
"success": True,
"message": f"Test email sent successfully to {test_email}",
}
except (ValidationException, ExternalServiceException):
raise # Re-raise domain exceptions
except Exception as e:
error_msg = str(e)
settings.mark_verification_failed(error_msg)
self.db.flush()
logger.warning(f"Email verification failed for vendor {vendor_id}: {error_msg}")
# Return error dict instead of raising - verification failure is not a server error
return {
"success": False,
"message": f"Failed to send test email: {error_msg}",
}
def _send_smtp_test(self, settings: VendorEmailSettings, to_email: str) -> None:
"""Send test email via SMTP."""
msg = MIMEMultipart("alternative")
msg["Subject"] = "Wizamart Email Configuration Test"
msg["From"] = f"{settings.from_name} <{settings.from_email}>"
msg["To"] = to_email
text_content = (
"This is a test email from Wizamart.\n\n"
"Your email settings are configured correctly!\n\n"
f"Provider: SMTP\n"
f"Host: {settings.smtp_host}\n"
)
html_content = f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Email Configuration Test</h2>
<p>This is a test email from <strong>Wizamart</strong>.</p>
<p style="color: #22c55e; font-weight: bold;">
Your email settings are configured correctly!
</p>
<hr style="border: 1px solid #e5e7eb; margin: 20px 0;">
<p style="color: #6b7280; font-size: 12px;">
Provider: SMTP<br>
Host: {settings.smtp_host}<br>
Sent at: {datetime.now(UTC).strftime('%Y-%m-%d %H:%M:%S UTC')}
</p>
</body>
</html>
"""
msg.attach(MIMEText(text_content, "plain"))
msg.attach(MIMEText(html_content, "html"))
# Connect and send
if settings.smtp_use_ssl:
server = smtplib.SMTP_SSL(settings.smtp_host, settings.smtp_port)
else:
server = smtplib.SMTP(settings.smtp_host, settings.smtp_port)
if settings.smtp_use_tls:
server.starttls()
server.login(settings.smtp_username, settings.smtp_password)
server.sendmail(settings.from_email, to_email, msg.as_string())
server.quit()
def _send_sendgrid_test(self, settings: VendorEmailSettings, to_email: str) -> None:
"""Send test email via SendGrid."""
try:
from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail
except ImportError:
raise ExternalServiceException(
service_name="SendGrid",
message="SendGrid library not installed. Contact support.",
)
message = Mail(
from_email=(settings.from_email, settings.from_name),
to_emails=to_email,
subject="Wizamart Email Configuration Test",
html_content=f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Email Configuration Test</h2>
<p>This is a test email from <strong>Wizamart</strong>.</p>
<p style="color: #22c55e; font-weight: bold;">
Your SendGrid settings are configured correctly!
</p>
</body>
</html>
""",
)
sg = SendGridAPIClient(settings.sendgrid_api_key)
response = sg.send(message)
if response.status_code >= 400:
raise ExternalServiceException(
service_name="SendGrid",
message=f"SendGrid error: HTTP {response.status_code}",
)
def _send_mailgun_test(self, settings: VendorEmailSettings, to_email: str) -> None:
"""Send test email via Mailgun."""
import requests
response = requests.post(
f"https://api.mailgun.net/v3/{settings.mailgun_domain}/messages",
auth=("api", settings.mailgun_api_key),
data={
"from": f"{settings.from_name} <{settings.from_email}>",
"to": to_email,
"subject": "Wizamart Email Configuration Test",
"html": f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Email Configuration Test</h2>
<p>This is a test email from <strong>Wizamart</strong>.</p>
<p style="color: #22c55e; font-weight: bold;">
Your Mailgun settings are configured correctly!
</p>
</body>
</html>
""",
},
timeout=30,
)
if response.status_code >= 400:
raise ExternalServiceException(
service_name="Mailgun",
message=f"Mailgun error: {response.text}",
)
def _send_ses_test(self, settings: VendorEmailSettings, to_email: str) -> None:
"""Send test email via Amazon SES."""
try:
import boto3
except ImportError:
raise ExternalServiceException(
service_name="Amazon SES",
message="boto3 library not installed. Contact support.",
)
client = boto3.client(
"ses",
region_name=settings.ses_region,
aws_access_key_id=settings.ses_access_key_id,
aws_secret_access_key=settings.ses_secret_access_key,
)
client.send_email(
Source=f"{settings.from_name} <{settings.from_email}>",
Destination={"ToAddresses": [to_email]},
Message={
"Subject": {"Data": "Wizamart Email Configuration Test"},
"Body": {
"Html": {
"Data": f"""
<html>
<body style="font-family: Arial, sans-serif; padding: 20px;">
<h2 style="color: #6b46c1;">Email Configuration Test</h2>
<p>This is a test email from <strong>Wizamart</strong>.</p>
<p style="color: #22c55e; font-weight: bold;">
Your Amazon SES settings are configured correctly!
</p>
</body>
</html>
"""
}
},
},
)
# =========================================================================
# TIER HELPERS
# =========================================================================
def get_available_providers(self, tier: TierCode | None) -> list[dict]:
"""
Get list of available email providers for a tier.
Returns list of providers with availability status.
"""
providers = [
{
"code": EmailProvider.SMTP.value,
"name": "SMTP",
"description": "Standard SMTP email server",
"available": True,
"tier_required": None,
},
{
"code": EmailProvider.SENDGRID.value,
"name": "SendGrid",
"description": "SendGrid email delivery platform",
"available": tier in PREMIUM_TIERS if tier else False,
"tier_required": "business",
},
{
"code": EmailProvider.MAILGUN.value,
"name": "Mailgun",
"description": "Mailgun email API",
"available": tier in PREMIUM_TIERS if tier else False,
"tier_required": "business",
},
{
"code": EmailProvider.SES.value,
"name": "Amazon SES",
"description": "Amazon Simple Email Service",
"available": tier in PREMIUM_TIERS if tier else False,
"tier_required": "business",
},
]
return providers
# Module-level service factory
def get_vendor_email_settings_service(db: Session) -> VendorEmailSettingsService:
"""Factory function to get a VendorEmailSettingsService instance."""
return VendorEmailSettingsService(db)

View File

@@ -0,0 +1,488 @@
# app/modules/cms/services/vendor_theme_service.py
"""
Vendor Theme Service
Business logic for vendor theme management.
Handles theme CRUD operations, preset application, and validation.
"""
import logging
import re
from sqlalchemy.orm import Session
from app.core.theme_presets import (
THEME_PRESETS,
apply_preset,
get_available_presets,
get_preset_preview,
)
from app.modules.tenancy.exceptions import VendorNotFoundException
from app.modules.cms.exceptions import (
InvalidColorFormatException,
InvalidFontFamilyException,
ThemeOperationException,
ThemePresetNotFoundException,
ThemeValidationException,
VendorThemeNotFoundException,
)
from models.database.vendor import Vendor
from models.database.vendor_theme import VendorTheme
from models.schema.vendor_theme import ThemePresetPreview, VendorThemeUpdate
logger = logging.getLogger(__name__)
class VendorThemeService:
"""
Service for managing vendor themes.
This service handles:
- Theme retrieval and creation
- Theme updates and validation
- Preset application
- Default theme generation
"""
def __init__(self):
"""Initialize the vendor theme service."""
self.logger = logging.getLogger(__name__)
# ============================================================================
# VENDOR RETRIEVAL
# ============================================================================
def _get_vendor_by_code(self, db: Session, vendor_code: str) -> Vendor:
"""
Get vendor by code or raise exception.
Args:
db: Database session
vendor_code: Vendor code to lookup
Returns:
Vendor object
Raises:
VendorNotFoundException: If vendor not found
"""
vendor = (
db.query(Vendor).filter(Vendor.vendor_code == vendor_code.upper()).first()
)
if not vendor:
self.logger.warning(f"Vendor not found: {vendor_code}")
raise VendorNotFoundException(vendor_code, identifier_type="code")
return vendor
# ============================================================================
# THEME RETRIEVAL
# ============================================================================
def get_theme(self, db: Session, vendor_code: str) -> dict:
"""
Get theme for vendor. Returns default if no custom theme exists.
Args:
db: Database session
vendor_code: Vendor code
Returns:
Theme dictionary
Raises:
VendorNotFoundException: If vendor not found
"""
self.logger.info(f"Getting theme for vendor: {vendor_code}")
# Verify vendor exists
vendor = self._get_vendor_by_code(db, vendor_code)
# Get theme
theme = db.query(VendorTheme).filter(VendorTheme.vendor_id == vendor.id).first()
if not theme:
self.logger.info(
f"No custom theme for vendor {vendor_code}, returning default"
)
return self._get_default_theme()
return theme.to_dict()
def _get_default_theme(self) -> dict:
"""
Get default theme configuration.
Returns:
Default theme dictionary
"""
return {
"theme_name": "default",
"colors": {
"primary": "#6366f1",
"secondary": "#8b5cf6",
"accent": "#ec4899",
"background": "#ffffff",
"text": "#1f2937",
"border": "#e5e7eb",
},
"fonts": {"heading": "Inter, sans-serif", "body": "Inter, sans-serif"},
"branding": {
"logo": None,
"logo_dark": None,
"favicon": None,
"banner": None,
},
"layout": {"style": "grid", "header": "fixed", "product_card": "modern"},
"social_links": {},
"custom_css": None,
"css_variables": {
"--color-primary": "#6366f1",
"--color-secondary": "#8b5cf6",
"--color-accent": "#ec4899",
"--color-background": "#ffffff",
"--color-text": "#1f2937",
"--color-border": "#e5e7eb",
"--font-heading": "Inter, sans-serif",
"--font-body": "Inter, sans-serif",
},
}
# ============================================================================
# THEME UPDATE
# ============================================================================
def update_theme(
self, db: Session, vendor_code: str, theme_data: VendorThemeUpdate
) -> VendorTheme:
"""
Update or create theme for vendor.
Args:
db: Database session
vendor_code: Vendor code
theme_data: Theme update data
Returns:
Updated VendorTheme object
Raises:
VendorNotFoundException: If vendor not found
ThemeValidationException: If theme data invalid
ThemeOperationException: If update fails
"""
self.logger.info(f"Updating theme for vendor: {vendor_code}")
try:
# Verify vendor exists
vendor = self._get_vendor_by_code(db, vendor_code)
# Get or create theme
theme = (
db.query(VendorTheme).filter(VendorTheme.vendor_id == vendor.id).first()
)
if not theme:
self.logger.info(f"Creating new theme for vendor {vendor_code}")
theme = VendorTheme(vendor_id=vendor.id, is_active=True)
db.add(theme)
# Validate theme data before applying
self._validate_theme_data(theme_data)
# Update theme fields
self._apply_theme_updates(theme, theme_data)
# Flush changes
db.flush()
db.refresh(theme)
self.logger.info(f"Theme updated successfully for vendor {vendor_code}")
return theme
except (VendorNotFoundException, ThemeValidationException):
# Re-raise custom exceptions
raise
except Exception as e:
self.logger.error(f"Failed to update theme for vendor {vendor_code}: {e}")
raise ThemeOperationException(
operation="update", vendor_code=vendor_code, reason=str(e)
)
def _apply_theme_updates(
self, theme: VendorTheme, theme_data: VendorThemeUpdate
) -> None:
"""
Apply theme updates to theme object.
Args:
theme: VendorTheme object to update
theme_data: Theme update data
"""
# Update theme name
if theme_data.theme_name:
theme.theme_name = theme_data.theme_name
# Update colors
if theme_data.colors:
theme.colors = theme_data.colors
# Update fonts
if theme_data.fonts:
if theme_data.fonts.get("heading"):
theme.font_family_heading = theme_data.fonts["heading"]
if theme_data.fonts.get("body"):
theme.font_family_body = theme_data.fonts["body"]
# Update branding
if theme_data.branding:
if theme_data.branding.get("logo") is not None:
theme.logo_url = theme_data.branding["logo"]
if theme_data.branding.get("logo_dark") is not None:
theme.logo_dark_url = theme_data.branding["logo_dark"]
if theme_data.branding.get("favicon") is not None:
theme.favicon_url = theme_data.branding["favicon"]
if theme_data.branding.get("banner") is not None:
theme.banner_url = theme_data.branding["banner"]
# Update layout
if theme_data.layout:
if theme_data.layout.get("style"):
theme.layout_style = theme_data.layout["style"]
if theme_data.layout.get("header"):
theme.header_style = theme_data.layout["header"]
if theme_data.layout.get("product_card"):
theme.product_card_style = theme_data.layout["product_card"]
# Update custom CSS
if theme_data.custom_css is not None:
theme.custom_css = theme_data.custom_css
# Update social links
if theme_data.social_links:
theme.social_links = theme_data.social_links
# ============================================================================
# PRESET OPERATIONS
# ============================================================================
def apply_theme_preset(
self, db: Session, vendor_code: str, preset_name: str
) -> VendorTheme:
"""
Apply a theme preset to vendor.
Args:
db: Database session
vendor_code: Vendor code
preset_name: Name of preset to apply
Returns:
Updated VendorTheme object
Raises:
VendorNotFoundException: If vendor not found
ThemePresetNotFoundException: If preset not found
ThemeOperationException: If application fails
"""
self.logger.info(f"Applying preset '{preset_name}' to vendor {vendor_code}")
try:
# Validate preset name
if preset_name not in THEME_PRESETS:
available = get_available_presets()
raise ThemePresetNotFoundException(preset_name, available)
# Verify vendor exists
vendor = self._get_vendor_by_code(db, vendor_code)
# Get or create theme
theme = (
db.query(VendorTheme).filter(VendorTheme.vendor_id == vendor.id).first()
)
if not theme:
self.logger.info(f"Creating new theme for vendor {vendor_code}")
theme = VendorTheme(vendor_id=vendor.id)
db.add(theme)
# Apply preset using helper function
apply_preset(theme, preset_name)
# Flush changes
db.flush()
db.refresh(theme)
self.logger.info(
f"Preset '{preset_name}' applied successfully to vendor {vendor_code}"
)
return theme
except (VendorNotFoundException, ThemePresetNotFoundException):
# Re-raise custom exceptions
raise
except Exception as e:
self.logger.error(f"Failed to apply preset to vendor {vendor_code}: {e}")
raise ThemeOperationException(
operation="apply_preset", vendor_code=vendor_code, reason=str(e)
)
def get_available_presets(self) -> list[ThemePresetPreview]:
"""
Get list of available theme presets.
Returns:
List of preset preview objects
"""
self.logger.debug("Getting available presets")
preset_names = get_available_presets()
presets = []
for name in preset_names:
preview = get_preset_preview(name)
presets.append(preview)
return presets
# ============================================================================
# THEME DELETION
# ============================================================================
def delete_theme(self, db: Session, vendor_code: str) -> dict:
"""
Delete custom theme for vendor (reverts to default).
Args:
db: Database session
vendor_code: Vendor code
Returns:
Success message dictionary
Raises:
VendorNotFoundException: If vendor not found
VendorThemeNotFoundException: If no custom theme exists
ThemeOperationException: If deletion fails
"""
self.logger.info(f"Deleting theme for vendor: {vendor_code}")
try:
# Verify vendor exists
vendor = self._get_vendor_by_code(db, vendor_code)
# Get theme
theme = (
db.query(VendorTheme).filter(VendorTheme.vendor_id == vendor.id).first()
)
if not theme:
raise VendorThemeNotFoundException(vendor_code)
# Delete theme
db.delete(theme)
self.logger.info(f"Theme deleted for vendor {vendor_code}")
return {
"message": "Theme deleted successfully. Vendor will use default theme."
}
except (VendorNotFoundException, VendorThemeNotFoundException):
# Re-raise custom exceptions
raise
except Exception as e:
self.logger.error(f"Failed to delete theme for vendor {vendor_code}: {e}")
raise ThemeOperationException(
operation="delete", vendor_code=vendor_code, reason=str(e)
)
# ============================================================================
# VALIDATION
# ============================================================================
def _validate_theme_data(self, theme_data: VendorThemeUpdate) -> None:
"""
Validate theme data before applying.
Args:
theme_data: Theme update data
Raises:
ThemeValidationException: If validation fails
InvalidColorFormatException: If color format invalid
InvalidFontFamilyException: If font family invalid
"""
# Validate colors
if theme_data.colors:
for color_key, color_value in theme_data.colors.items():
if not self._is_valid_color(color_value):
raise InvalidColorFormatException(color_value, color_key)
# Validate fonts
if theme_data.fonts:
for font_key, font_value in theme_data.fonts.items():
if not self._is_valid_font(font_value):
raise InvalidFontFamilyException(font_value, font_key)
# Validate layout values
if theme_data.layout:
valid_layouts = {
"style": ["grid", "list", "masonry"],
"header": ["fixed", "static", "transparent"],
"product_card": ["modern", "classic", "minimal"],
}
for layout_key, layout_value in theme_data.layout.items():
if layout_key in valid_layouts:
if layout_value not in valid_layouts[layout_key]:
raise ThemeValidationException(
message=f"Invalid {layout_key} value: {layout_value}",
field=layout_key,
validation_errors={
layout_key: f"Must be one of: {', '.join(valid_layouts[layout_key])}"
},
)
def _is_valid_color(self, color: str) -> bool:
"""
Validate color format (hex color).
Args:
color: Color string to validate
Returns:
True if valid, False otherwise
"""
if not color:
return False
# Check for hex color format (#RGB or #RRGGBB)
hex_pattern = r"^#([A-Fa-f0-9]{6}|[A-Fa-f0-9]{3})$"
return bool(re.match(hex_pattern, color))
def _is_valid_font(self, font: str) -> bool:
"""
Validate font family format.
Args:
font: Font family string to validate
Returns:
True if valid, False otherwise
"""
if not font or len(font) < 3:
return False
# Basic validation - font should not be empty and should be reasonable length
return len(font) <= 200
# ============================================================================
# SERVICE INSTANCE
# ============================================================================
vendor_theme_service = VendorThemeService()