Multitenant implementation with custom Domain, theme per vendor
This commit is contained in:
458
app/services/vendor_domain_service.py
Normal file
458
app/services/vendor_domain_service.py
Normal file
@@ -0,0 +1,458 @@
|
||||
# app/services/vendor_domain_service.py
|
||||
"""
|
||||
Vendor domain service for managing custom domain operations.
|
||||
|
||||
This module provides classes and functions for:
|
||||
- Adding and removing custom domains
|
||||
- Domain verification via DNS
|
||||
- Domain activation and deactivation
|
||||
- Setting primary domains
|
||||
- Domain validation and normalization
|
||||
"""
|
||||
|
||||
import logging
|
||||
import secrets
|
||||
from typing import List, Tuple, Optional
|
||||
from datetime import datetime, timezone
|
||||
|
||||
from sqlalchemy.orm import Session
|
||||
from sqlalchemy import and_
|
||||
|
||||
from app.exceptions import (
|
||||
VendorNotFoundException,
|
||||
VendorDomainNotFoundException,
|
||||
VendorDomainAlreadyExistsException,
|
||||
InvalidDomainFormatException,
|
||||
ReservedDomainException,
|
||||
DomainNotVerifiedException,
|
||||
DomainVerificationFailedException,
|
||||
DomainAlreadyVerifiedException,
|
||||
MultiplePrimaryDomainsException,
|
||||
DNSVerificationException,
|
||||
MaxDomainsReachedException,
|
||||
UnauthorizedDomainAccessException,
|
||||
ValidationException,
|
||||
)
|
||||
from models.schema.vendor_domain import VendorDomainCreate, VendorDomainUpdate
|
||||
from models.database.vendor import Vendor
|
||||
from models.database.vendor_domain import VendorDomain
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class VendorDomainService:
|
||||
"""Service class for vendor domain operations."""
|
||||
|
||||
def __init__(self):
|
||||
self.max_domains_per_vendor = 10 # Configure as needed
|
||||
self.reserved_subdomains = ['www', 'admin', 'api', 'mail', 'smtp', 'ftp', 'cpanel', 'webmail']
|
||||
|
||||
def add_domain(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
domain_data: VendorDomainCreate
|
||||
) -> VendorDomain:
|
||||
"""
|
||||
Add a custom domain to vendor.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID to add domain to
|
||||
domain_data: Domain creation data
|
||||
|
||||
Returns:
|
||||
Created VendorDomain object
|
||||
|
||||
Raises:
|
||||
VendorNotFoundException: If vendor not found
|
||||
VendorDomainAlreadyExistsException: If domain already registered
|
||||
MaxDomainsReachedException: If vendor has reached max domains
|
||||
InvalidDomainFormatException: If domain format is invalid
|
||||
"""
|
||||
try:
|
||||
# Verify vendor exists
|
||||
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
|
||||
|
||||
# Check domain limit
|
||||
self._check_domain_limit(db, vendor_id)
|
||||
|
||||
# Normalize domain
|
||||
normalized_domain = VendorDomain.normalize_domain(domain_data.domain)
|
||||
|
||||
# Validate domain format
|
||||
self._validate_domain_format(normalized_domain)
|
||||
|
||||
# Check if domain already exists
|
||||
if self._domain_exists(db, normalized_domain):
|
||||
existing_domain = db.query(VendorDomain).filter(
|
||||
VendorDomain.domain == normalized_domain
|
||||
).first()
|
||||
raise VendorDomainAlreadyExistsException(
|
||||
normalized_domain,
|
||||
existing_domain.vendor_id if existing_domain else None
|
||||
)
|
||||
|
||||
# If setting as primary, unset other primary domains
|
||||
if domain_data.is_primary:
|
||||
self._unset_primary_domains(db, vendor_id)
|
||||
|
||||
# Create domain record
|
||||
new_domain = VendorDomain(
|
||||
vendor_id=vendor_id,
|
||||
domain=normalized_domain,
|
||||
is_primary=domain_data.is_primary,
|
||||
verification_token=secrets.token_urlsafe(32),
|
||||
is_verified=False, # Requires DNS verification
|
||||
is_active=False, # Cannot be active until verified
|
||||
ssl_status="pending"
|
||||
)
|
||||
|
||||
db.add(new_domain)
|
||||
db.commit()
|
||||
db.refresh(new_domain)
|
||||
|
||||
logger.info(f"Domain {normalized_domain} added to vendor {vendor_id}")
|
||||
return new_domain
|
||||
|
||||
except (
|
||||
VendorNotFoundException,
|
||||
VendorDomainAlreadyExistsException,
|
||||
MaxDomainsReachedException,
|
||||
InvalidDomainFormatException,
|
||||
ReservedDomainException
|
||||
):
|
||||
db.rollback()
|
||||
raise
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error adding domain: {str(e)}")
|
||||
raise ValidationException("Failed to add domain")
|
||||
|
||||
def get_vendor_domains(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int
|
||||
) -> List[VendorDomain]:
|
||||
"""
|
||||
Get all domains for a vendor.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
vendor_id: Vendor ID
|
||||
|
||||
Returns:
|
||||
List of VendorDomain objects
|
||||
|
||||
Raises:
|
||||
VendorNotFoundException: If vendor not found
|
||||
"""
|
||||
try:
|
||||
# Verify vendor exists
|
||||
self._get_vendor_by_id_or_raise(db, vendor_id)
|
||||
|
||||
domains = db.query(VendorDomain).filter(
|
||||
VendorDomain.vendor_id == vendor_id
|
||||
).order_by(
|
||||
VendorDomain.is_primary.desc(),
|
||||
VendorDomain.created_at.desc()
|
||||
).all()
|
||||
|
||||
return domains
|
||||
|
||||
except VendorNotFoundException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting vendor domains: {str(e)}")
|
||||
raise ValidationException("Failed to retrieve domains")
|
||||
|
||||
def get_domain_by_id(
|
||||
self,
|
||||
db: Session,
|
||||
domain_id: int
|
||||
) -> VendorDomain:
|
||||
"""
|
||||
Get domain by ID.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
domain_id: Domain ID
|
||||
|
||||
Returns:
|
||||
VendorDomain object
|
||||
|
||||
Raises:
|
||||
VendorDomainNotFoundException: If domain not found
|
||||
"""
|
||||
domain = db.query(VendorDomain).filter(VendorDomain.id == domain_id).first()
|
||||
if not domain:
|
||||
raise VendorDomainNotFoundException(str(domain_id))
|
||||
return domain
|
||||
|
||||
def update_domain(
|
||||
self,
|
||||
db: Session,
|
||||
domain_id: int,
|
||||
domain_update: VendorDomainUpdate
|
||||
) -> VendorDomain:
|
||||
"""
|
||||
Update domain settings.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
domain_id: Domain ID
|
||||
domain_update: Update data
|
||||
|
||||
Returns:
|
||||
Updated VendorDomain object
|
||||
|
||||
Raises:
|
||||
VendorDomainNotFoundException: If domain not found
|
||||
DomainNotVerifiedException: If trying to activate unverified domain
|
||||
"""
|
||||
try:
|
||||
domain = self.get_domain_by_id(db, domain_id)
|
||||
|
||||
# If setting as primary, unset other primary domains
|
||||
if domain_update.is_primary:
|
||||
self._unset_primary_domains(db, domain.vendor_id, exclude_domain_id=domain_id)
|
||||
domain.is_primary = True
|
||||
|
||||
# If activating, check verification
|
||||
if domain_update.is_active is True and not domain.is_verified:
|
||||
raise DomainNotVerifiedException(domain_id, domain.domain)
|
||||
|
||||
# Update fields
|
||||
if domain_update.is_active is not None:
|
||||
domain.is_active = domain_update.is_active
|
||||
|
||||
db.commit()
|
||||
db.refresh(domain)
|
||||
|
||||
logger.info(f"Domain {domain.domain} updated")
|
||||
return domain
|
||||
|
||||
except (VendorDomainNotFoundException, DomainNotVerifiedException):
|
||||
db.rollback()
|
||||
raise
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error updating domain: {str(e)}")
|
||||
raise ValidationException("Failed to update domain")
|
||||
|
||||
def delete_domain(
|
||||
self,
|
||||
db: Session,
|
||||
domain_id: int
|
||||
) -> str:
|
||||
"""
|
||||
Delete a custom domain.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
domain_id: Domain ID
|
||||
|
||||
Returns:
|
||||
Success message
|
||||
|
||||
Raises:
|
||||
VendorDomainNotFoundException: If domain not found
|
||||
"""
|
||||
try:
|
||||
domain = self.get_domain_by_id(db, domain_id)
|
||||
domain_name = domain.domain
|
||||
vendor_id = domain.vendor_id
|
||||
|
||||
db.delete(domain)
|
||||
db.commit()
|
||||
|
||||
logger.info(f"Domain {domain_name} deleted from vendor {vendor_id}")
|
||||
return f"Domain {domain_name} deleted successfully"
|
||||
|
||||
except VendorDomainNotFoundException:
|
||||
db.rollback()
|
||||
raise
|
||||
except Exception as e:
|
||||
db.rollback()
|
||||
logger.error(f"Error deleting domain: {str(e)}")
|
||||
raise ValidationException("Failed to delete domain")
|
||||
|
||||
def verify_domain(
|
||||
self,
|
||||
db: Session,
|
||||
domain_id: int
|
||||
) -> Tuple[VendorDomain, str]:
|
||||
"""
|
||||
Verify domain ownership via DNS TXT record.
|
||||
|
||||
The vendor must add a TXT record:
|
||||
Name: _letzshop-verify.{domain}
|
||||
Value: {verification_token}
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
domain_id: Domain ID
|
||||
|
||||
Returns:
|
||||
Tuple of (verified_domain, message)
|
||||
|
||||
Raises:
|
||||
VendorDomainNotFoundException: If domain not found
|
||||
DomainAlreadyVerifiedException: If already verified
|
||||
DomainVerificationFailedException: If verification fails
|
||||
"""
|
||||
try:
|
||||
import dns.resolver
|
||||
|
||||
domain = self.get_domain_by_id(db, domain_id)
|
||||
|
||||
# Check if already verified
|
||||
if domain.is_verified:
|
||||
raise DomainAlreadyVerifiedException(domain_id, domain.domain)
|
||||
|
||||
# Query DNS TXT records
|
||||
try:
|
||||
txt_records = dns.resolver.resolve(
|
||||
f"_letzshop-verify.{domain.domain}",
|
||||
'TXT'
|
||||
)
|
||||
|
||||
# Check if verification token is present
|
||||
for txt in txt_records:
|
||||
txt_value = txt.to_text().strip('"')
|
||||
if txt_value == domain.verification_token:
|
||||
# Verification successful
|
||||
domain.is_verified = True
|
||||
domain.verified_at = datetime.now(timezone.utc)
|
||||
db.commit()
|
||||
db.refresh(domain)
|
||||
|
||||
logger.info(f"Domain {domain.domain} verified successfully")
|
||||
return domain, f"Domain {domain.domain} verified successfully"
|
||||
|
||||
# Token not found
|
||||
raise DomainVerificationFailedException(
|
||||
domain.domain,
|
||||
"Verification token not found in DNS records"
|
||||
)
|
||||
|
||||
except dns.resolver.NXDOMAIN:
|
||||
raise DomainVerificationFailedException(
|
||||
domain.domain,
|
||||
f"DNS record _letzshop-verify.{domain.domain} not found"
|
||||
)
|
||||
except dns.resolver.NoAnswer:
|
||||
raise DomainVerificationFailedException(
|
||||
domain.domain,
|
||||
"No TXT records found for verification"
|
||||
)
|
||||
except Exception as dns_error:
|
||||
raise DNSVerificationException(
|
||||
domain.domain,
|
||||
str(dns_error)
|
||||
)
|
||||
|
||||
except (
|
||||
VendorDomainNotFoundException,
|
||||
DomainAlreadyVerifiedException,
|
||||
DomainVerificationFailedException,
|
||||
DNSVerificationException
|
||||
):
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error verifying domain: {str(e)}")
|
||||
raise ValidationException("Failed to verify domain")
|
||||
|
||||
def get_verification_instructions(
|
||||
self,
|
||||
db: Session,
|
||||
domain_id: int
|
||||
) -> dict:
|
||||
"""
|
||||
Get DNS verification instructions for domain.
|
||||
|
||||
Args:
|
||||
db: Database session
|
||||
domain_id: Domain ID
|
||||
|
||||
Returns:
|
||||
Dict with verification instructions
|
||||
|
||||
Raises:
|
||||
VendorDomainNotFoundException: If domain not found
|
||||
"""
|
||||
domain = self.get_domain_by_id(db, domain_id)
|
||||
|
||||
return {
|
||||
"domain": domain.domain,
|
||||
"verification_token": domain.verification_token,
|
||||
"instructions": {
|
||||
"step1": "Go to your domain's DNS settings (at your domain registrar)",
|
||||
"step2": "Add a new TXT record with the following values:",
|
||||
"step3": "Wait for DNS propagation (5-15 minutes)",
|
||||
"step4": "Click 'Verify Domain' button in admin panel"
|
||||
},
|
||||
"txt_record": {
|
||||
"type": "TXT",
|
||||
"name": "_letzshop-verify",
|
||||
"value": domain.verification_token,
|
||||
"ttl": 3600
|
||||
},
|
||||
"common_registrars": {
|
||||
"Cloudflare": "https://dash.cloudflare.com",
|
||||
"GoDaddy": "https://dcc.godaddy.com/manage/dns",
|
||||
"Namecheap": "https://www.namecheap.com/myaccount/domain-list/",
|
||||
"Google Domains": "https://domains.google.com"
|
||||
}
|
||||
}
|
||||
|
||||
# Private helper methods
|
||||
def _get_vendor_by_id_or_raise(self, db: Session, vendor_id: int) -> Vendor:
|
||||
"""Get vendor by ID or raise exception."""
|
||||
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
||||
if not vendor:
|
||||
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
|
||||
return vendor
|
||||
|
||||
def _check_domain_limit(self, db: Session, vendor_id: int) -> None:
|
||||
"""Check if vendor has reached maximum domain limit."""
|
||||
domain_count = db.query(VendorDomain).filter(
|
||||
VendorDomain.vendor_id == vendor_id
|
||||
).count()
|
||||
|
||||
if domain_count >= self.max_domains_per_vendor:
|
||||
raise MaxDomainsReachedException(vendor_id, self.max_domains_per_vendor)
|
||||
|
||||
def _domain_exists(self, db: Session, domain: str) -> bool:
|
||||
"""Check if domain already exists in system."""
|
||||
return db.query(VendorDomain).filter(
|
||||
VendorDomain.domain == domain
|
||||
).first() is not None
|
||||
|
||||
def _validate_domain_format(self, domain: str) -> None:
|
||||
"""Validate domain format and check for reserved subdomains."""
|
||||
# Check for reserved subdomains
|
||||
first_part = domain.split('.')[0]
|
||||
if first_part in self.reserved_subdomains:
|
||||
raise ReservedDomainException(domain, first_part)
|
||||
|
||||
def _unset_primary_domains(
|
||||
self,
|
||||
db: Session,
|
||||
vendor_id: int,
|
||||
exclude_domain_id: Optional[int] = None
|
||||
) -> None:
|
||||
"""Unset all primary domains for vendor."""
|
||||
query = db.query(VendorDomain).filter(
|
||||
VendorDomain.vendor_id == vendor_id,
|
||||
VendorDomain.is_primary == True
|
||||
)
|
||||
|
||||
if exclude_domain_id:
|
||||
query = query.filter(VendorDomain.id != exclude_domain_id)
|
||||
|
||||
query.update({"is_primary": False})
|
||||
|
||||
|
||||
# Create service instance
|
||||
vendor_domain_service = VendorDomainService()
|
||||
Reference in New Issue
Block a user