Replace all ~1,086 occurrences of Wizamart/wizamart/WIZAMART/WizaMart with Orion/orion/ORION across 184 files. This includes database identifiers, email addresses, domain references, R2 bucket names, DNS prefixes, encryption salt, Celery app name, config defaults, Docker configs, CI configs, documentation, seed data, and templates. Renames homepage-wizamart.html template to homepage-orion.html. Fixes duplicate file_pattern key in api.yaml architecture rule. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
14 KiB
Background Tasks Architecture
Overview
This document defines the harmonized architecture for all background tasks in the application. Background tasks are long-running operations that execute asynchronously, allowing users to continue browsing while the task completes.
Task Queue Infrastructure
Orion uses Celery with Redis for production-grade background task processing:
| Component | Purpose | Port |
|---|---|---|
| Celery Worker | Executes background tasks | - |
| Celery Beat | Scheduled task scheduler | - |
| Redis | Message broker & result backend | 6379 |
| Flower | Web-based task monitoring | 5555 |
Configuration
# Environment variables (.env)
REDIS_URL=redis://localhost:6379/0
USE_CELERY=true # false = use FastAPI BackgroundTasks
FLOWER_URL=http://localhost:5555
FLOWER_PASSWORD=changeme
Running Celery
# Start Redis
docker compose up -d redis
# Start Celery worker (processes tasks)
make celery-worker
# Start Celery beat (scheduled tasks)
make celery-beat
# Start both together (development)
make celery-dev
# Start Flower monitoring
make flower
Feature Flag: USE_CELERY
The system supports gradual migration via the USE_CELERY feature flag:
USE_CELERY=false(default): Uses FastAPI'sBackgroundTasksfor immediate execution without Redis dependency. Suitable for development.USE_CELERY=true: Routes tasks through Celery for persistent queuing, retries, and scheduling. Required for production.
Task Routing
Tasks are routed to queues based on their characteristics:
| Queue | Tasks | Characteristics |
|---|---|---|
default |
Product exports | Fast, non-blocking |
long_running |
Imports, code quality scans, tests | May take 10+ minutes |
scheduled |
Subscription maintenance | Triggered by Celery Beat |
Celery Task ID Tracking
All background job models include a celery_task_id field for cross-referencing with Flower:
class MarketplaceImportJob(Base):
celery_task_id = Column(String(255), nullable=True)
This enables:
- Deep linking to Flower for task details
- Task cancellation via Flower API
- Correlation between database records and Celery events
Current State Analysis
| Task Type | Database Model | Status Values | Tracked in BG Tasks Page |
|---|---|---|---|
| Product Import | MarketplaceImportJob |
pending, processing, completed, failed, completed_with_errors | Yes |
| Order Import | LetzshopHistoricalImportJob |
pending, fetching, processing, completed, failed | No |
| Test Runs | TestRun |
running, passed, failed, error | Yes |
| Product Export | LetzshopSyncLog |
success, partial (post-facto logging only) | No |
| Code Quality Scan | ArchitectureScan |
pending, running, completed, failed, completed_with_warnings | Yes ✓ |
Identified Issues
- Inconsistent status values - "processing" vs "running" vs "fetching"
- Inconsistent field naming -
started_at/completed_atvstimestamp - Incomplete tracking - Not all tasks appear on background tasks page
Missing status on some models - Code quality scans have no status field✓ Fixed- Exports not async - Product exports run synchronously
Harmonized Architecture
Standard Status Values
All background tasks MUST use these standard status values:
| Status | Description | When Set |
|---|---|---|
pending |
Task created but not yet started | On job creation |
running |
Task is actively executing | When processing begins |
completed |
Task finished successfully | On successful completion |
failed |
Task failed with error | On unrecoverable error |
completed_with_warnings |
Task completed but with non-fatal issues | When partial success |
Standard Database Model Fields
All background task models MUST include these fields:
class BackgroundTaskMixin:
"""Mixin providing standard background task fields."""
# Required fields
id = Column(Integer, primary_key=True)
status = Column(String(30), nullable=False, default="pending", index=True)
created_at = Column(DateTime, default=lambda: datetime.now(UTC))
started_at = Column(DateTime, nullable=True)
completed_at = Column(DateTime, nullable=True)
triggered_by = Column(String(100), nullable=True) # "manual:username", "scheduled", "api"
error_message = Column(Text, nullable=True)
# Optional but recommended
progress_percent = Column(Integer, nullable=True) # 0-100 for progress tracking
progress_message = Column(String(255), nullable=True) # Current step description
Standard Task Type Identifier
Each task type MUST have a unique identifier used for:
- Filtering on background tasks page
- API routing
- Frontend component selection
| Task Type ID | Description | Model |
|---|---|---|
product_import |
Marketplace product CSV import | MarketplaceImportJob |
order_import |
Letzshop historical order import | LetzshopHistoricalImportJob |
product_export |
Product feed export | ProductExportJob (new) |
test_run |
Pytest execution | TestRun |
code_quality_scan |
Architecture/Security/Performance scan | ArchitectureScan |
API Design Pattern
Trigger Endpoint
All background tasks MUST follow this pattern:
POST /api/v1/{domain}/{task-type}
Request: Task-specific parameters
Response (202 Accepted):
{
"job_id": 123,
"task_type": "product_import",
"status": "pending",
"message": "Task queued successfully",
"status_url": "/api/v1/admin/background-tasks/123"
}
Status Endpoint
GET /api/v1/admin/background-tasks/{job_id}
Response:
{
"id": 123,
"task_type": "product_import",
"status": "running",
"progress_percent": 45,
"progress_message": "Processing batch 5 of 11",
"started_at": "2024-01-15T10:30:00Z",
"completed_at": null,
"triggered_by": "manual:admin",
"error_message": null,
"details": {
// Task-specific details
}
}
Unified List Endpoint
GET /api/v1/admin/background-tasks
Query parameters:
task_type- Filter by type (product_import, order_import, etc.)status- Filter by status (pending, running, completed, failed)limit- Number of results (default: 50)
Frontend Design Pattern
Page-Level Task Status Component
Every page that triggers a background task MUST include:
- Task Trigger Button - Initiates the task
- Running Indicator - Shows when task is in progress
- Status Banner - Shows task status when returning to page
- Results Summary - Shows outcome on completion
Standard JavaScript Pattern
// Task status tracking mixin
const BackgroundTaskMixin = {
// State
activeTask: null, // Current running task
taskHistory: [], // Recent tasks for this page
pollInterval: null,
// Initialize - check for active tasks on page load
async initTaskStatus() {
const tasks = await this.fetchActiveTasks();
if (tasks.length > 0) {
this.activeTask = tasks[0];
this.startPolling();
}
},
// Start a new task
async startTask(endpoint, params) {
const result = await apiClient.post(endpoint, params);
this.activeTask = {
id: result.job_id,
status: 'pending',
task_type: result.task_type
};
this.startPolling();
return result;
},
// Poll for status updates
startPolling() {
this.pollInterval = setInterval(() => this.pollStatus(), 3000);
},
async pollStatus() {
if (!this.activeTask) return;
const status = await apiClient.get(
`/admin/background-tasks/${this.activeTask.id}`
);
this.activeTask = status;
if (['completed', 'failed', 'completed_with_warnings'].includes(status.status)) {
this.stopPolling();
this.onTaskComplete(status);
}
},
stopPolling() {
if (this.pollInterval) {
clearInterval(this.pollInterval);
this.pollInterval = null;
}
},
// Override in component
onTaskComplete(result) {
// Handle completion
}
};
Status Banner Component
<!-- Task Status Banner - Include on all pages with background tasks -->
<template x-if="activeTask">
<div class="mb-4 p-4 rounded-lg"
:class="{
'bg-blue-50 dark:bg-blue-900': activeTask.status === 'running',
'bg-green-50 dark:bg-green-900': activeTask.status === 'completed',
'bg-red-50 dark:bg-red-900': activeTask.status === 'failed',
'bg-yellow-50 dark:bg-yellow-900': activeTask.status === 'completed_with_warnings'
}">
<div class="flex items-center justify-between">
<div class="flex items-center">
<template x-if="activeTask.status === 'running'">
<span class="animate-spin mr-2">...</span>
</template>
<span x-text="getTaskStatusMessage(activeTask)"></span>
</div>
<template x-if="activeTask.progress_percent">
<div class="w-32 bg-gray-200 rounded-full h-2">
<div class="bg-blue-600 h-2 rounded-full"
:style="`width: ${activeTask.progress_percent}%`"></div>
</div>
</template>
</div>
</div>
</template>
Background Tasks Service
Unified Query Interface
The BackgroundTasksService MUST provide methods to query all task types:
class BackgroundTasksService:
"""Unified service for all background task types."""
TASK_MODELS = {
'product_import': MarketplaceImportJob,
'order_import': LetzshopHistoricalImportJob,
'test_run': TestRun,
'code_quality_scan': ArchitectureScan,
'product_export': ProductExportJob,
}
def get_all_running_tasks(self, db: Session) -> list[BackgroundTaskResponse]:
"""Get all currently running tasks across all types."""
def get_tasks(
self,
db: Session,
task_type: str = None,
status: str = None,
limit: int = 50
) -> list[BackgroundTaskResponse]:
"""Get tasks with optional filtering."""
def get_task_by_id(
self,
db: Session,
task_id: int,
task_type: str
) -> BackgroundTaskResponse:
"""Get a specific task by ID and type."""
Implementation Checklist
For Each Background Task Type:
- Database model includes all standard fields (status, started_at, completed_at, etc.)
- Status values follow standard enum (pending, running, completed, failed)
- API returns 202 with job_id on task creation
- Status endpoint available at standard path
- Task appears on unified background tasks page
- Frontend shows status banner on originating page
- Polling implemented with 3-5 second interval
- Error handling stores message in error_message field
- Admin notification triggered on failures
Migration Plan
| Task | Current State | Required Changes |
|---|---|---|
| Product Import | Mostly compliant | Change "processing" to "running" |
| Order Import | Partially compliant | Add to background tasks page, standardize status |
| Test Runs | Mostly compliant | Add "pending" status before run starts |
| Product Export | Not async | Create job model, make async |
| Code Quality Scan | Implemented ✓ |
Code Quality Scan Implementation
The code quality scan background task implementation serves as a reference for the harmonized architecture.
Files Modified/Created
| File | Purpose |
|---|---|
models/database/architecture_scan.py |
Added status fields (status, started_at, completed_at, error_message, progress_message) |
alembic/versions/g5b6c7d8e9f0_add_scan_status_fields.py |
Database migration for status fields |
app/tasks/code_quality_tasks.py |
Background task function execute_code_quality_scan() |
app/api/v1/admin/code_quality.py |
Updated endpoints with 202 response and polling |
app/services/background_tasks_service.py |
Added scan methods to unified service |
app/api/v1/admin/background_tasks.py |
Integrated scans into unified background tasks page |
static/admin/js/code-quality-dashboard.js |
Frontend polling and status display |
app/templates/admin/code-quality-dashboard.html |
Progress banner UI |
API Endpoints
POST /admin/code-quality/scan
→ Returns 202 with job IDs immediately
→ Response: { scans: [{id, validator_type, status, message}], status_url }
GET /admin/code-quality/scans/{scan_id}/status
→ Poll for individual scan status
→ Response: { id, status, progress_message, total_violations, ... }
GET /admin/code-quality/scans/running
→ Get all currently running scans
→ Response: [{ id, status, progress_message, ... }]
Frontend Behavior
- On page load: Checks for running scans via
/scans/running - On scan trigger: Creates scans, stores IDs, starts polling
- Polling: Every 3 seconds via
setInterval - Progress display: Shows spinner and progress message
- On completion: Fetches final results, updates dashboard
- User can navigate away: Scan continues in background
Background Task Pattern
async def execute_code_quality_scan(scan_id: int):
db = SessionLocal() # Own database session
try:
scan = db.query(ArchitectureScan).get(scan_id)
scan.status = "running"
scan.started_at = datetime.now(UTC)
scan.progress_message = "Running validator..."
db.commit()
# Execute validator...
scan.status = "completed"
scan.completed_at = datetime.now(UTC)
db.commit()
except Exception as e:
scan.status = "failed"
scan.error_message = str(e)
db.commit()
# Create admin notification
finally:
db.close()
Architecture Rules
See .architecture-rules/background_tasks.yaml for enforceable rules.
Key rules:
BG-001: All background task models must include standard fieldsBG-002: Status values must be from approved setBG-003: Task triggers must return 202 with job_idBG-004: All tasks must be registered in BackgroundTasksService