Migrate background tasks from FastAPI BackgroundTasks to Celery with Redis for persistent task queuing, retries, and scheduled jobs. Key changes: - Add Celery configuration with Redis broker/backend - Create task dispatcher with USE_CELERY feature flag for gradual rollout - Add Celery task wrappers for all background operations: - Marketplace imports - Letzshop historical imports - Product exports - Code quality scans - Test runs - Subscription scheduled tasks (via Celery Beat) - Add celery_task_id column to job tables for Flower integration - Add Flower dashboard link to admin background tasks page - Update docker-compose.yml with worker, beat, and flower services - Add Makefile targets: celery-worker, celery-beat, celery-dev, flower When USE_CELERY=false (default), system falls back to FastAPI BackgroundTasks for development without Redis dependency. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
148 lines
4.3 KiB
Python
148 lines
4.3 KiB
Python
"""
|
|
Test Run Models
|
|
Database models for tracking pytest test runs and results
|
|
"""
|
|
|
|
from sqlalchemy import (
|
|
JSON,
|
|
Column,
|
|
DateTime,
|
|
Float,
|
|
ForeignKey,
|
|
Integer,
|
|
String,
|
|
Text,
|
|
)
|
|
from sqlalchemy.orm import relationship
|
|
from sqlalchemy.sql import func
|
|
|
|
from app.core.database import Base
|
|
|
|
|
|
class TestRun(Base):
|
|
"""Represents a single pytest run"""
|
|
|
|
__tablename__ = "test_runs"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
timestamp = Column(
|
|
DateTime(timezone=True), server_default=func.now(), nullable=False, index=True
|
|
)
|
|
|
|
# Test counts
|
|
total_tests = Column(Integer, default=0)
|
|
passed = Column(Integer, default=0)
|
|
failed = Column(Integer, default=0)
|
|
errors = Column(Integer, default=0)
|
|
skipped = Column(Integer, default=0)
|
|
xfailed = Column(Integer, default=0) # Expected failures
|
|
xpassed = Column(Integer, default=0) # Unexpected passes
|
|
|
|
# Coverage info (optional)
|
|
coverage_percent = Column(Float, nullable=True)
|
|
|
|
# Timing
|
|
duration_seconds = Column(Float, default=0.0)
|
|
|
|
# Run metadata
|
|
triggered_by = Column(String(100)) # 'manual', 'scheduled', 'ci/cd'
|
|
git_commit_hash = Column(String(40))
|
|
git_branch = Column(String(100))
|
|
test_path = Column(String(500)) # Which tests were run (e.g., 'tests/unit')
|
|
pytest_args = Column(String(500)) # Command line arguments used
|
|
|
|
# Status
|
|
status = Column(
|
|
String(20), default="running", index=True
|
|
) # 'running', 'passed', 'failed', 'error'
|
|
|
|
# Celery task tracking (optional - for USE_CELERY=true)
|
|
celery_task_id = Column(String(255), nullable=True, index=True)
|
|
|
|
# Relationship to test results
|
|
results = relationship(
|
|
"TestResult", back_populates="run", cascade="all, delete-orphan"
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<TestRun(id={self.id}, total={self.total_tests}, passed={self.passed}, failed={self.failed})>"
|
|
|
|
@property
|
|
def pass_rate(self) -> float:
|
|
"""Calculate pass rate as percentage"""
|
|
if self.total_tests == 0:
|
|
return 0.0
|
|
return (self.passed / self.total_tests) * 100
|
|
|
|
|
|
class TestResult(Base):
|
|
"""Represents a single test result from a pytest run"""
|
|
|
|
__tablename__ = "test_results"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
run_id = Column(Integer, ForeignKey("test_runs.id"), nullable=False, index=True)
|
|
|
|
# Test identification
|
|
node_id = Column(
|
|
String(500), nullable=False, index=True
|
|
) # e.g., 'tests/unit/test_foo.py::test_bar'
|
|
test_name = Column(String(200), nullable=False) # e.g., 'test_bar'
|
|
test_file = Column(String(300), nullable=False) # e.g., 'tests/unit/test_foo.py'
|
|
test_class = Column(String(200)) # e.g., 'TestFooClass' (optional)
|
|
|
|
# Result
|
|
outcome = Column(
|
|
String(20), nullable=False, index=True
|
|
) # 'passed', 'failed', 'error', 'skipped', 'xfailed', 'xpassed'
|
|
duration_seconds = Column(Float, default=0.0)
|
|
|
|
# Failure details (if applicable)
|
|
error_message = Column(Text)
|
|
traceback = Column(Text)
|
|
|
|
# Test metadata
|
|
markers = Column(JSON) # List of pytest markers
|
|
parameters = Column(JSON) # Parametrized test params
|
|
|
|
# Timestamps
|
|
created_at = Column(
|
|
DateTime(timezone=True), server_default=func.now(), nullable=False
|
|
)
|
|
|
|
# Relationships
|
|
run = relationship("TestRun", back_populates="results")
|
|
|
|
def __repr__(self):
|
|
return f"<TestResult(id={self.id}, node_id={self.node_id}, outcome={self.outcome})>"
|
|
|
|
|
|
class TestCollection(Base):
|
|
"""Cached test collection info for quick stats"""
|
|
|
|
__tablename__ = "test_collections"
|
|
|
|
id = Column(Integer, primary_key=True, index=True)
|
|
|
|
# Collection stats
|
|
total_tests = Column(Integer, default=0)
|
|
total_files = Column(Integer, default=0)
|
|
total_classes = Column(Integer, default=0)
|
|
|
|
# By category
|
|
unit_tests = Column(Integer, default=0)
|
|
integration_tests = Column(Integer, default=0)
|
|
performance_tests = Column(Integer, default=0)
|
|
system_tests = Column(Integer, default=0)
|
|
|
|
# Collection data
|
|
test_files = Column(JSON) # List of test files with counts
|
|
|
|
# Timestamps
|
|
collected_at = Column(
|
|
DateTime(timezone=True), server_default=func.now(), nullable=False
|
|
)
|
|
|
|
def __repr__(self):
|
|
return f"<TestCollection(id={self.id}, total={self.total_tests})>"
|