- Create onboarding-specific exceptions (OnboardingNotFoundException, etc.) - Remove HTTPException usage from API endpoints per architecture rules - Let exceptions propagate to global handler - Add 12 integration tests for onboarding API endpoints 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
660 lines
22 KiB
Python
660 lines
22 KiB
Python
# app/services/onboarding_service.py
|
|
"""
|
|
Vendor onboarding service.
|
|
|
|
Handles the 4-step mandatory onboarding wizard for new vendors:
|
|
1. Company Profile Setup
|
|
2. Letzshop API Configuration
|
|
3. Product & Order Import (CSV feed URL configuration)
|
|
4. Order Sync (historical import with progress tracking)
|
|
"""
|
|
|
|
import logging
|
|
from datetime import UTC, datetime
|
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
from app.exceptions import (
|
|
OnboardingCsvUrlRequiredException,
|
|
OnboardingNotFoundException,
|
|
OnboardingStepOrderException,
|
|
OnboardingSyncJobNotFoundException,
|
|
OnboardingSyncNotCompleteException,
|
|
VendorNotFoundException,
|
|
)
|
|
from app.services.letzshop.credentials_service import LetzshopCredentialsService
|
|
from app.services.letzshop.order_service import LetzshopOrderService
|
|
from models.database.onboarding import (
|
|
OnboardingStatus,
|
|
OnboardingStep,
|
|
VendorOnboarding,
|
|
)
|
|
from models.database.vendor import Vendor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OnboardingService:
|
|
"""
|
|
Service for managing vendor onboarding workflow.
|
|
|
|
Provides methods for each onboarding step and progress tracking.
|
|
"""
|
|
|
|
def __init__(self, db: Session):
|
|
"""
|
|
Initialize the onboarding service.
|
|
|
|
Args:
|
|
db: SQLAlchemy database session.
|
|
"""
|
|
self.db = db
|
|
|
|
# =========================================================================
|
|
# Onboarding CRUD
|
|
# =========================================================================
|
|
|
|
def get_onboarding(self, vendor_id: int) -> VendorOnboarding | None:
|
|
"""Get onboarding record for a vendor."""
|
|
return (
|
|
self.db.query(VendorOnboarding)
|
|
.filter(VendorOnboarding.vendor_id == vendor_id)
|
|
.first()
|
|
)
|
|
|
|
def get_onboarding_or_raise(self, vendor_id: int) -> VendorOnboarding:
|
|
"""Get onboarding record or raise OnboardingNotFoundException."""
|
|
onboarding = self.get_onboarding(vendor_id)
|
|
if onboarding is None:
|
|
raise OnboardingNotFoundException(vendor_id)
|
|
return onboarding
|
|
|
|
def create_onboarding(self, vendor_id: int) -> VendorOnboarding:
|
|
"""
|
|
Create a new onboarding record for a vendor.
|
|
|
|
This is called automatically when a vendor is created during signup.
|
|
"""
|
|
# Check if already exists
|
|
existing = self.get_onboarding(vendor_id)
|
|
if existing:
|
|
logger.warning(f"Onboarding already exists for vendor {vendor_id}")
|
|
return existing
|
|
|
|
onboarding = VendorOnboarding(
|
|
vendor_id=vendor_id,
|
|
status=OnboardingStatus.NOT_STARTED.value,
|
|
current_step=OnboardingStep.COMPANY_PROFILE.value,
|
|
)
|
|
|
|
self.db.add(onboarding)
|
|
self.db.flush()
|
|
|
|
logger.info(f"Created onboarding record for vendor {vendor_id}")
|
|
return onboarding
|
|
|
|
def get_or_create_onboarding(self, vendor_id: int) -> VendorOnboarding:
|
|
"""Get existing onboarding or create new one."""
|
|
onboarding = self.get_onboarding(vendor_id)
|
|
if onboarding is None:
|
|
onboarding = self.create_onboarding(vendor_id)
|
|
return onboarding
|
|
|
|
# =========================================================================
|
|
# Status Helpers
|
|
# =========================================================================
|
|
|
|
def is_completed(self, vendor_id: int) -> bool:
|
|
"""Check if onboarding is completed for a vendor."""
|
|
onboarding = self.get_onboarding(vendor_id)
|
|
if onboarding is None:
|
|
return False
|
|
return onboarding.is_completed
|
|
|
|
def get_status_response(self, vendor_id: int) -> dict:
|
|
"""
|
|
Get full onboarding status for API response.
|
|
|
|
Returns a dictionary with all step statuses and progress information.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
return {
|
|
"id": onboarding.id,
|
|
"vendor_id": onboarding.vendor_id,
|
|
"status": onboarding.status,
|
|
"current_step": onboarding.current_step,
|
|
# Step statuses
|
|
"company_profile": {
|
|
"completed": onboarding.step_company_profile_completed,
|
|
"completed_at": onboarding.step_company_profile_completed_at,
|
|
"data": onboarding.step_company_profile_data,
|
|
},
|
|
"letzshop_api": {
|
|
"completed": onboarding.step_letzshop_api_completed,
|
|
"completed_at": onboarding.step_letzshop_api_completed_at,
|
|
"connection_verified": onboarding.step_letzshop_api_connection_verified,
|
|
},
|
|
"product_import": {
|
|
"completed": onboarding.step_product_import_completed,
|
|
"completed_at": onboarding.step_product_import_completed_at,
|
|
"csv_url_set": onboarding.step_product_import_csv_url_set,
|
|
},
|
|
"order_sync": {
|
|
"completed": onboarding.step_order_sync_completed,
|
|
"completed_at": onboarding.step_order_sync_completed_at,
|
|
"job_id": onboarding.step_order_sync_job_id,
|
|
},
|
|
# Progress tracking
|
|
"completion_percentage": onboarding.completion_percentage,
|
|
"completed_steps_count": onboarding.completed_steps_count,
|
|
"total_steps": 4,
|
|
# Completion info
|
|
"is_completed": onboarding.is_completed,
|
|
"started_at": onboarding.started_at,
|
|
"completed_at": onboarding.completed_at,
|
|
# Admin override info
|
|
"skipped_by_admin": onboarding.skipped_by_admin,
|
|
"skipped_at": onboarding.skipped_at,
|
|
"skipped_reason": onboarding.skipped_reason,
|
|
}
|
|
|
|
# =========================================================================
|
|
# Step 1: Company Profile
|
|
# =========================================================================
|
|
|
|
def get_company_profile_data(self, vendor_id: int) -> dict:
|
|
"""Get current company profile data for editing."""
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
if not vendor:
|
|
return {}
|
|
|
|
company = vendor.company
|
|
|
|
return {
|
|
"company_name": company.name if company else None,
|
|
"brand_name": vendor.name,
|
|
"description": vendor.description,
|
|
"contact_email": vendor.effective_contact_email,
|
|
"contact_phone": vendor.effective_contact_phone,
|
|
"website": vendor.effective_website,
|
|
"business_address": vendor.effective_business_address,
|
|
"tax_number": vendor.effective_tax_number,
|
|
"default_language": vendor.default_language,
|
|
"dashboard_language": vendor.dashboard_language,
|
|
}
|
|
|
|
def complete_company_profile(
|
|
self,
|
|
vendor_id: int,
|
|
company_name: str | None = None,
|
|
brand_name: str | None = None,
|
|
description: str | None = None,
|
|
contact_email: str | None = None,
|
|
contact_phone: str | None = None,
|
|
website: str | None = None,
|
|
business_address: str | None = None,
|
|
tax_number: str | None = None,
|
|
default_language: str = "fr",
|
|
dashboard_language: str = "fr",
|
|
) -> dict:
|
|
"""
|
|
Save company profile and mark Step 1 as complete.
|
|
|
|
Returns response with next step information.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
# Update onboarding status if this is the first step
|
|
if onboarding.status == OnboardingStatus.NOT_STARTED.value:
|
|
onboarding.status = OnboardingStatus.IN_PROGRESS.value
|
|
onboarding.started_at = datetime.now(UTC)
|
|
|
|
# Get vendor and company
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
if not vendor:
|
|
raise VendorNotFoundException(vendor_id)
|
|
|
|
company = vendor.company
|
|
|
|
# Update company name if provided
|
|
if company and company_name:
|
|
company.name = company_name
|
|
|
|
# Update vendor fields
|
|
if brand_name:
|
|
vendor.name = brand_name
|
|
if description is not None:
|
|
vendor.description = description
|
|
|
|
# Update contact info (vendor-level overrides)
|
|
vendor.contact_email = contact_email
|
|
vendor.contact_phone = contact_phone
|
|
vendor.website = website
|
|
vendor.business_address = business_address
|
|
vendor.tax_number = tax_number
|
|
|
|
# Update language settings
|
|
vendor.default_language = default_language
|
|
vendor.dashboard_language = dashboard_language
|
|
|
|
# Store profile data in onboarding record
|
|
onboarding.step_company_profile_data = {
|
|
"company_name": company_name,
|
|
"brand_name": brand_name,
|
|
"description": description,
|
|
"contact_email": contact_email,
|
|
"contact_phone": contact_phone,
|
|
"website": website,
|
|
"business_address": business_address,
|
|
"tax_number": tax_number,
|
|
"default_language": default_language,
|
|
"dashboard_language": dashboard_language,
|
|
}
|
|
|
|
# Mark step complete
|
|
onboarding.mark_step_complete(OnboardingStep.COMPANY_PROFILE.value)
|
|
|
|
self.db.flush()
|
|
|
|
logger.info(f"Completed company profile step for vendor {vendor_id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"step_completed": True,
|
|
"next_step": onboarding.current_step,
|
|
"message": "Company profile saved successfully",
|
|
}
|
|
|
|
# =========================================================================
|
|
# Step 2: Letzshop API Configuration
|
|
# =========================================================================
|
|
|
|
def test_letzshop_api(
|
|
self,
|
|
api_key: str,
|
|
shop_slug: str,
|
|
) -> dict:
|
|
"""
|
|
Test Letzshop API connection without saving credentials.
|
|
|
|
Returns connection test result with vendor info if successful.
|
|
"""
|
|
credentials_service = LetzshopCredentialsService(self.db)
|
|
|
|
# Test the API key
|
|
success, response_time, error = credentials_service.test_api_key(api_key)
|
|
|
|
if success:
|
|
return {
|
|
"success": True,
|
|
"message": f"Connection successful ({response_time:.0f}ms)",
|
|
"vendor_name": None, # Would need to query Letzshop for this
|
|
"vendor_id": None,
|
|
"shop_slug": shop_slug,
|
|
}
|
|
else:
|
|
return {
|
|
"success": False,
|
|
"message": error or "Connection failed",
|
|
"vendor_name": None,
|
|
"vendor_id": None,
|
|
"shop_slug": None,
|
|
}
|
|
|
|
def complete_letzshop_api(
|
|
self,
|
|
vendor_id: int,
|
|
api_key: str,
|
|
shop_slug: str,
|
|
letzshop_vendor_id: str | None = None,
|
|
) -> dict:
|
|
"""
|
|
Save Letzshop API credentials and mark Step 2 as complete.
|
|
|
|
Tests connection first, only saves if successful.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
# Verify step order
|
|
if not onboarding.can_proceed_to_step(OnboardingStep.LETZSHOP_API.value):
|
|
raise OnboardingStepOrderException(
|
|
current_step=onboarding.current_step,
|
|
required_step=OnboardingStep.COMPANY_PROFILE.value,
|
|
)
|
|
|
|
# Test connection first
|
|
credentials_service = LetzshopCredentialsService(self.db)
|
|
success, response_time, error = credentials_service.test_api_key(api_key)
|
|
|
|
if not success:
|
|
return {
|
|
"success": False,
|
|
"step_completed": False,
|
|
"next_step": None,
|
|
"message": f"Connection test failed: {error}",
|
|
"connection_verified": False,
|
|
}
|
|
|
|
# Save credentials
|
|
credentials_service.upsert_credentials(
|
|
vendor_id=vendor_id,
|
|
api_key=api_key,
|
|
auto_sync_enabled=False, # Enable after onboarding
|
|
sync_interval_minutes=15,
|
|
)
|
|
|
|
# Update vendor with Letzshop identity
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
if vendor:
|
|
vendor.letzshop_vendor_slug = shop_slug
|
|
if letzshop_vendor_id:
|
|
vendor.letzshop_vendor_id = letzshop_vendor_id
|
|
|
|
# Mark step complete
|
|
onboarding.step_letzshop_api_connection_verified = True
|
|
onboarding.mark_step_complete(OnboardingStep.LETZSHOP_API.value)
|
|
|
|
self.db.flush()
|
|
|
|
logger.info(f"Completed Letzshop API step for vendor {vendor_id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"step_completed": True,
|
|
"next_step": onboarding.current_step,
|
|
"message": "Letzshop API configured successfully",
|
|
"connection_verified": True,
|
|
}
|
|
|
|
# =========================================================================
|
|
# Step 3: Product & Order Import Configuration
|
|
# =========================================================================
|
|
|
|
def get_product_import_config(self, vendor_id: int) -> dict:
|
|
"""Get current product import configuration."""
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
if not vendor:
|
|
return {}
|
|
|
|
return {
|
|
"csv_url_fr": vendor.letzshop_csv_url_fr,
|
|
"csv_url_en": vendor.letzshop_csv_url_en,
|
|
"csv_url_de": vendor.letzshop_csv_url_de,
|
|
"default_tax_rate": vendor.letzshop_default_tax_rate,
|
|
"delivery_method": vendor.letzshop_delivery_method,
|
|
"preorder_days": vendor.letzshop_preorder_days,
|
|
}
|
|
|
|
def complete_product_import(
|
|
self,
|
|
vendor_id: int,
|
|
csv_url_fr: str | None = None,
|
|
csv_url_en: str | None = None,
|
|
csv_url_de: str | None = None,
|
|
default_tax_rate: int = 17,
|
|
delivery_method: str = "package_delivery",
|
|
preorder_days: int = 1,
|
|
) -> dict:
|
|
"""
|
|
Save product import configuration and mark Step 3 as complete.
|
|
|
|
At least one CSV URL must be provided.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
# Verify step order
|
|
if not onboarding.can_proceed_to_step(OnboardingStep.PRODUCT_IMPORT.value):
|
|
raise OnboardingStepOrderException(
|
|
current_step=onboarding.current_step,
|
|
required_step=OnboardingStep.LETZSHOP_API.value,
|
|
)
|
|
|
|
# Validate at least one CSV URL
|
|
csv_urls_count = sum([
|
|
bool(csv_url_fr),
|
|
bool(csv_url_en),
|
|
bool(csv_url_de),
|
|
])
|
|
|
|
if csv_urls_count == 0:
|
|
raise OnboardingCsvUrlRequiredException()
|
|
|
|
# Update vendor settings
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
if not vendor:
|
|
raise VendorNotFoundException(vendor_id)
|
|
|
|
vendor.letzshop_csv_url_fr = csv_url_fr
|
|
vendor.letzshop_csv_url_en = csv_url_en
|
|
vendor.letzshop_csv_url_de = csv_url_de
|
|
vendor.letzshop_default_tax_rate = default_tax_rate
|
|
vendor.letzshop_delivery_method = delivery_method
|
|
vendor.letzshop_preorder_days = preorder_days
|
|
|
|
# Mark step complete
|
|
onboarding.step_product_import_csv_url_set = True
|
|
onboarding.mark_step_complete(OnboardingStep.PRODUCT_IMPORT.value)
|
|
|
|
self.db.flush()
|
|
|
|
logger.info(f"Completed product import step for vendor {vendor_id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"step_completed": True,
|
|
"next_step": onboarding.current_step,
|
|
"message": "Product import configured successfully",
|
|
"csv_urls_configured": csv_urls_count,
|
|
}
|
|
|
|
# =========================================================================
|
|
# Step 4: Order Sync
|
|
# =========================================================================
|
|
|
|
def trigger_order_sync(
|
|
self,
|
|
vendor_id: int,
|
|
user_id: int,
|
|
days_back: int = 90,
|
|
include_products: bool = True,
|
|
) -> dict:
|
|
"""
|
|
Trigger historical order import and return job info.
|
|
|
|
Creates a background job that imports historical orders from Letzshop.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
# Verify step order
|
|
if not onboarding.can_proceed_to_step(OnboardingStep.ORDER_SYNC.value):
|
|
raise OnboardingStepOrderException(
|
|
current_step=onboarding.current_step,
|
|
required_step=OnboardingStep.PRODUCT_IMPORT.value,
|
|
)
|
|
|
|
# Create historical import job
|
|
order_service = LetzshopOrderService(self.db)
|
|
|
|
# Check for existing running job
|
|
existing_job = order_service.get_running_historical_import_job(vendor_id)
|
|
if existing_job:
|
|
return {
|
|
"success": True,
|
|
"message": "Import job already running",
|
|
"job_id": existing_job.id,
|
|
"estimated_duration_minutes": 5, # Estimate
|
|
}
|
|
|
|
# Create new job
|
|
job = order_service.create_historical_import_job(
|
|
vendor_id=vendor_id,
|
|
user_id=user_id,
|
|
)
|
|
|
|
# Store job ID in onboarding
|
|
onboarding.step_order_sync_job_id = job.id
|
|
|
|
self.db.flush()
|
|
|
|
logger.info(f"Triggered order sync job {job.id} for vendor {vendor_id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Historical import started",
|
|
"job_id": job.id,
|
|
"estimated_duration_minutes": 5, # Estimate
|
|
}
|
|
|
|
def get_order_sync_progress(
|
|
self,
|
|
vendor_id: int,
|
|
job_id: int,
|
|
) -> dict:
|
|
"""
|
|
Get progress of historical import job.
|
|
|
|
Returns current status, progress, and counts.
|
|
"""
|
|
order_service = LetzshopOrderService(self.db)
|
|
job = order_service.get_historical_import_job_by_id(vendor_id, job_id)
|
|
|
|
if not job:
|
|
return {
|
|
"job_id": job_id,
|
|
"status": "not_found",
|
|
"progress_percentage": 0,
|
|
"current_phase": None,
|
|
"orders_imported": 0,
|
|
"orders_total": None,
|
|
"products_imported": 0,
|
|
"started_at": None,
|
|
"completed_at": None,
|
|
"estimated_remaining_seconds": None,
|
|
"error_message": "Job not found",
|
|
}
|
|
|
|
# Calculate progress percentage
|
|
progress = 0
|
|
if job.status == "completed":
|
|
progress = 100
|
|
elif job.status == "failed":
|
|
progress = 0
|
|
elif job.status == "processing":
|
|
if job.total_shipments and job.total_shipments > 0:
|
|
progress = int((job.processed_shipments or 0) / job.total_shipments * 100)
|
|
else:
|
|
progress = 50 # Indeterminate
|
|
|
|
# Determine current phase
|
|
current_phase = None
|
|
if job.status == "fetching":
|
|
current_phase = "fetching"
|
|
elif job.status == "processing":
|
|
current_phase = "orders"
|
|
elif job.status == "completed":
|
|
current_phase = "complete"
|
|
|
|
return {
|
|
"job_id": job.id,
|
|
"status": job.status,
|
|
"progress_percentage": progress,
|
|
"current_phase": current_phase,
|
|
"orders_imported": job.processed_shipments or 0,
|
|
"orders_total": job.total_shipments,
|
|
"products_imported": 0, # TODO: Track this
|
|
"started_at": job.started_at,
|
|
"completed_at": job.completed_at,
|
|
"estimated_remaining_seconds": None, # TODO: Calculate
|
|
"error_message": job.error_message,
|
|
}
|
|
|
|
def complete_order_sync(
|
|
self,
|
|
vendor_id: int,
|
|
job_id: int,
|
|
) -> dict:
|
|
"""
|
|
Mark order sync step as complete after job finishes.
|
|
|
|
Also marks the entire onboarding as complete.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
# Verify job is complete
|
|
order_service = LetzshopOrderService(self.db)
|
|
job = order_service.get_historical_import_job_by_id(vendor_id, job_id)
|
|
|
|
if not job:
|
|
raise OnboardingSyncJobNotFoundException(job_id)
|
|
|
|
if job.status not in ("completed", "failed"):
|
|
raise OnboardingSyncNotCompleteException(job.status)
|
|
|
|
# Mark step complete (even if job failed - they can retry later)
|
|
onboarding.mark_step_complete(OnboardingStep.ORDER_SYNC.value)
|
|
|
|
# Enable auto-sync now that onboarding is complete
|
|
credentials_service = LetzshopCredentialsService(self.db)
|
|
credentials = credentials_service.get_credentials(vendor_id)
|
|
if credentials:
|
|
credentials.auto_sync_enabled = True
|
|
|
|
self.db.flush()
|
|
|
|
# Get vendor code for redirect URL
|
|
vendor = self.db.query(Vendor).filter(Vendor.id == vendor_id).first()
|
|
vendor_code = vendor.vendor_code if vendor else ""
|
|
|
|
logger.info(f"Completed onboarding for vendor {vendor_id}")
|
|
|
|
return {
|
|
"success": True,
|
|
"step_completed": True,
|
|
"onboarding_completed": True,
|
|
"message": "Onboarding complete! Welcome to Wizamart.",
|
|
"redirect_url": f"/vendor/{vendor_code}/dashboard",
|
|
}
|
|
|
|
# =========================================================================
|
|
# Admin Skip
|
|
# =========================================================================
|
|
|
|
def skip_onboarding(
|
|
self,
|
|
vendor_id: int,
|
|
admin_user_id: int,
|
|
reason: str,
|
|
) -> dict:
|
|
"""
|
|
Admin-only: Skip onboarding for a vendor.
|
|
|
|
Used for support cases where manual setup is needed.
|
|
"""
|
|
onboarding = self.get_or_create_onboarding(vendor_id)
|
|
|
|
onboarding.skipped_by_admin = True
|
|
onboarding.skipped_at = datetime.now(UTC)
|
|
onboarding.skipped_reason = reason
|
|
onboarding.skipped_by_user_id = admin_user_id
|
|
onboarding.status = OnboardingStatus.SKIPPED.value
|
|
|
|
self.db.flush()
|
|
|
|
logger.info(
|
|
f"Admin {admin_user_id} skipped onboarding for vendor {vendor_id}: {reason}"
|
|
)
|
|
|
|
return {
|
|
"success": True,
|
|
"message": "Onboarding skipped by admin",
|
|
"vendor_id": vendor_id,
|
|
"skipped_at": onboarding.skipped_at,
|
|
}
|
|
|
|
|
|
# Singleton-style convenience instance
|
|
def get_onboarding_service(db: Session) -> OnboardingService:
|
|
"""Get an OnboardingService instance."""
|
|
return OnboardingService(db)
|