# app/modules/hosting/services/client_service_service.py """ Client service CRUD service. Manages operational tracking for hosted site services (domains, email, SSL, etc.). """ import logging from datetime import UTC, datetime, timedelta from sqlalchemy.orm import Session from app.modules.hosting.exceptions import ClientServiceNotFoundException from app.modules.hosting.models import ( ClientService, ClientServiceStatus, ServiceType, ) logger = logging.getLogger(__name__) class ClientServiceService: """Service for client service CRUD operations.""" def get_by_id(self, db: Session, service_id: int) -> ClientService: service = db.query(ClientService).filter(ClientService.id == service_id).first() if not service: raise ClientServiceNotFoundException(str(service_id)) return service def get_for_site(self, db: Session, hosted_site_id: int) -> list[ClientService]: return ( db.query(ClientService) .filter(ClientService.hosted_site_id == hosted_site_id) .order_by(ClientService.created_at.desc()) .all() ) def create(self, db: Session, hosted_site_id: int, data: dict) -> ClientService: service = ClientService( hosted_site_id=hosted_site_id, service_type=ServiceType(data["service_type"]), name=data["name"], description=data.get("description"), status=ClientServiceStatus.PENDING, billing_period=data.get("billing_period"), price_cents=data.get("price_cents"), currency=data.get("currency", "EUR"), addon_product_id=data.get("addon_product_id"), domain_name=data.get("domain_name"), registrar=data.get("registrar"), mailbox_count=data.get("mailbox_count"), expires_at=data.get("expires_at"), period_start=data.get("period_start"), period_end=data.get("period_end"), auto_renew=data.get("auto_renew", True), notes=data.get("notes"), ) db.add(service) db.flush() logger.info("Created client service: %s (site_id=%d)", service.name, hosted_site_id) return service def update(self, db: Session, service_id: int, data: dict) -> ClientService: service = self.get_by_id(db, service_id) for field in [ "name", "description", "status", "billing_period", "price_cents", "currency", "addon_product_id", "domain_name", "registrar", "mailbox_count", "expires_at", "period_start", "period_end", "auto_renew", "notes", ]: if field in data and data[field] is not None: setattr(service, field, data[field]) db.flush() return service def delete(self, db: Session, service_id: int) -> bool: service = self.get_by_id(db, service_id) db.delete(service) db.flush() logger.info("Deleted client service: %d", service_id) return True def get_expiring_soon(self, db: Session, days: int = 30) -> list[ClientService]: """Get services expiring within the given number of days.""" cutoff = datetime.now(UTC) + timedelta(days=days) return ( db.query(ClientService) .filter( ClientService.expires_at.isnot(None), ClientService.expires_at <= cutoff, ClientService.status == ClientServiceStatus.ACTIVE, ) .order_by(ClientService.expires_at.asc()) .all() ) def get_all( self, db: Session, *, page: int = 1, per_page: int = 20, service_type: str | None = None, status: str | None = None, ) -> tuple[list[ClientService], int]: """Get all services with pagination and filters.""" query = db.query(ClientService) if service_type: query = query.filter(ClientService.service_type == service_type) if status: query = query.filter(ClientService.status == status) total = query.count() services = ( query.order_by(ClientService.created_at.desc()) .offset((page - 1) * per_page) .limit(per_page) .all() ) return services, total client_service_service = ClientServiceService()