refactor: convert legacy models/schemas to re-exports

Legacy model and schema files now re-export from module locations
for backwards compatibility:

models/database/:
- letzshop.py -> app.modules.marketplace.models
- marketplace_import_job.py -> app.modules.marketplace.models
- marketplace_product.py -> app.modules.marketplace.models
- marketplace_product_translation.py -> app.modules.marketplace.models
- subscription.py -> app.modules.billing.models
- architecture_scan.py -> app.modules.dev_tools.models
- test_run.py -> app.modules.dev_tools.models

models/schema/:
- marketplace_import_job.py -> app.modules.marketplace.schemas
- marketplace_product.py -> app.modules.marketplace.schemas
- subscription.py -> app.modules.billing.schemas
- stats.py -> app.modules.analytics.schemas

This maintains import compatibility while moving actual code
to self-contained modules.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-28 22:22:18 +01:00
parent 37cf74cbf4
commit 3ffa337fca
11 changed files with 350 additions and 2836 deletions

View File

@@ -1,202 +1,27 @@
# models/database/architecture_scan.py
"""
Architecture Scan Models
Database models for tracking code quality scans and violations
Architecture Scan Models - LEGACY LOCATION
This file exists for backward compatibility.
The canonical location is now: app/modules/dev_tools/models/architecture_scan.py
All imports should use the new location:
from app.modules.dev_tools.models import ArchitectureScan, ArchitectureViolation, ...
"""
from sqlalchemy import (
JSON,
Boolean,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Text,
# Re-export from canonical location for backward compatibility
from app.modules.dev_tools.models.architecture_scan import (
ArchitectureScan,
ArchitectureViolation,
ArchitectureRule,
ViolationAssignment,
ViolationComment,
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.core.database import Base
class ArchitectureScan(Base):
"""Represents a single run of a code quality validator"""
__tablename__ = "architecture_scans"
id = Column(Integer, primary_key=True, index=True)
timestamp = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False, index=True
)
validator_type = Column(
String(20), nullable=False, index=True, default="architecture"
) # 'architecture', 'security', 'performance'
# Background task status fields (harmonized architecture)
status = Column(
String(30), nullable=False, default="pending", index=True
) # 'pending', 'running', 'completed', 'failed', 'completed_with_warnings'
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
error_message = Column(Text, nullable=True)
progress_message = Column(String(255), nullable=True) # Current step description
# Scan results
total_files = Column(Integer, default=0)
total_violations = Column(Integer, default=0)
errors = Column(Integer, default=0)
warnings = Column(Integer, default=0)
duration_seconds = Column(Float, default=0.0)
triggered_by = Column(String(100)) # 'manual:username', 'scheduled', 'ci/cd'
git_commit_hash = Column(String(40))
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Relationship to violations
violations = relationship(
"ArchitectureViolation", back_populates="scan", cascade="all, delete-orphan"
)
def __repr__(self):
return f"<ArchitectureScan(id={self.id}, violations={self.total_violations}, errors={self.errors})>"
class ArchitectureViolation(Base):
"""Represents a single code quality violation found during a scan"""
__tablename__ = "architecture_violations"
id = Column(Integer, primary_key=True, index=True)
scan_id = Column(
Integer, ForeignKey("architecture_scans.id"), nullable=False, index=True
)
validator_type = Column(
String(20), nullable=False, index=True, default="architecture"
) # 'architecture', 'security', 'performance'
rule_id = Column(String(20), nullable=False, index=True) # e.g., 'API-001', 'SEC-001', 'PERF-001'
rule_name = Column(String(200), nullable=False)
severity = Column(
String(10), nullable=False, index=True
) # 'error', 'warning', 'info'
file_path = Column(String(500), nullable=False, index=True)
line_number = Column(Integer, nullable=False)
message = Column(Text, nullable=False)
context = Column(Text) # Code snippet
suggestion = Column(Text)
status = Column(
String(20), default="open", index=True
) # 'open', 'assigned', 'resolved', 'ignored', 'technical_debt'
assigned_to = Column(Integer, ForeignKey("users.id"))
resolved_at = Column(DateTime(timezone=True))
resolved_by = Column(Integer, ForeignKey("users.id"))
resolution_note = Column(Text)
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# Relationships
scan = relationship("ArchitectureScan", back_populates="violations")
assigned_user = relationship(
"User", foreign_keys=[assigned_to], backref="assigned_violations"
)
resolver = relationship(
"User", foreign_keys=[resolved_by], backref="resolved_violations"
)
assignments = relationship(
"ViolationAssignment", back_populates="violation", cascade="all, delete-orphan"
)
comments = relationship(
"ViolationComment", back_populates="violation", cascade="all, delete-orphan"
)
def __repr__(self):
return f"<ArchitectureViolation(id={self.id}, rule={self.rule_id}, file={self.file_path}:{self.line_number})>"
class ArchitectureRule(Base):
"""Code quality rules configuration (from YAML with database overrides)"""
__tablename__ = "architecture_rules"
id = Column(Integer, primary_key=True, index=True)
rule_id = Column(
String(20), unique=True, nullable=False, index=True
) # e.g., 'API-001', 'SEC-001', 'PERF-001'
validator_type = Column(
String(20), nullable=False, index=True, default="architecture"
) # 'architecture', 'security', 'performance'
category = Column(
String(50), nullable=False
) # 'api_endpoint', 'service_layer', 'authentication', 'database', etc.
name = Column(String(200), nullable=False)
description = Column(Text)
severity = Column(String(10), nullable=False) # Can override default from YAML
enabled = Column(Boolean, default=True, nullable=False)
custom_config = Column(JSON) # For rule-specific settings
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
updated_at = Column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
def __repr__(self):
return f"<ArchitectureRule(id={self.rule_id}, name={self.name}, enabled={self.enabled})>"
class ViolationAssignment(Base):
"""Tracks assignment of violations to developers"""
__tablename__ = "violation_assignments"
id = Column(Integer, primary_key=True, index=True)
violation_id = Column(
Integer, ForeignKey("architecture_violations.id"), nullable=False, index=True
)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
assigned_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
assigned_by = Column(Integer, ForeignKey("users.id"))
due_date = Column(DateTime(timezone=True))
priority = Column(
String(10), default="medium"
) # 'low', 'medium', 'high', 'critical'
# Relationships
violation = relationship("ArchitectureViolation", back_populates="assignments")
user = relationship("User", foreign_keys=[user_id], backref="violation_assignments")
assigner = relationship(
"User", foreign_keys=[assigned_by], backref="assigned_by_me"
)
def __repr__(self):
return f"<ViolationAssignment(id={self.id}, violation_id={self.violation_id}, user_id={self.user_id})>"
class ViolationComment(Base):
"""Comments on violations for collaboration"""
__tablename__ = "violation_comments"
id = Column(Integer, primary_key=True, index=True)
violation_id = Column(
Integer, ForeignKey("architecture_violations.id"), nullable=False, index=True
)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
comment = Column(Text, nullable=False)
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# Relationships
violation = relationship("ArchitectureViolation", back_populates="comments")
user = relationship("User", backref="violation_comments")
def __repr__(self):
return f"<ViolationComment(id={self.id}, violation_id={self.violation_id}, user_id={self.user_id})>"
__all__ = [
"ArchitectureScan",
"ArchitectureViolation",
"ArchitectureRule",
"ViolationAssignment",
"ViolationComment",
]

View File

@@ -1,379 +1,34 @@
# models/database/letzshop.py
"""
Database models for Letzshop marketplace integration.
Legacy location for Letzshop models.
Provides models for:
- VendorLetzshopCredentials: Per-vendor API key storage (encrypted)
- LetzshopFulfillmentQueue: Outbound operation queue with retry
- LetzshopSyncLog: Audit trail for sync operations
- LetzshopHistoricalImportJob: Progress tracking for historical imports
MIGRATED: Models have been moved to app.modules.marketplace.models.letzshop.
Note: Orders are now stored in the unified `orders` table with channel='letzshop'.
The LetzshopOrder model has been removed in favor of the unified Order model.
New location:
from app.modules.marketplace.models import (
VendorLetzshopCredentials,
LetzshopFulfillmentQueue,
LetzshopVendorCache,
LetzshopSyncLog,
LetzshopHistoricalImportJob,
)
This file re-exports from the new location for backward compatibility.
"""
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
String,
Text,
# Re-export from the new canonical location
from app.modules.marketplace.models.letzshop import (
VendorLetzshopCredentials,
LetzshopFulfillmentQueue,
LetzshopVendorCache,
LetzshopSyncLog,
LetzshopHistoricalImportJob,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class VendorLetzshopCredentials(Base, TimestampMixin):
"""
Per-vendor Letzshop API credentials.
Stores encrypted API keys and sync settings for each vendor's
Letzshop integration.
"""
__tablename__ = "vendor_letzshop_credentials"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(
Integer, ForeignKey("vendors.id"), unique=True, nullable=False, index=True
)
# Encrypted API credentials
api_key_encrypted = Column(Text, nullable=False)
api_endpoint = Column(String(255), default="https://letzshop.lu/graphql")
# Sync settings
auto_sync_enabled = Column(Boolean, default=False)
sync_interval_minutes = Column(Integer, default=15)
# Test mode (disables API mutations when enabled)
test_mode_enabled = Column(Boolean, default=False)
# Default carrier settings
default_carrier = Column(String(50), nullable=True) # greco, colissimo, xpresslogistics
# Carrier label URL prefixes
carrier_greco_label_url = Column(
String(500), default="https://dispatchweb.fr/Tracky/Home/"
)
carrier_colissimo_label_url = Column(String(500), nullable=True)
carrier_xpresslogistics_label_url = Column(String(500), nullable=True)
# Last sync status
last_sync_at = Column(DateTime(timezone=True), nullable=True)
last_sync_status = Column(String(50), nullable=True) # success, failed, partial
last_sync_error = Column(Text, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="letzshop_credentials")
def __repr__(self):
return f"<VendorLetzshopCredentials(vendor_id={self.vendor_id}, auto_sync={self.auto_sync_enabled})>"
class LetzshopFulfillmentQueue(Base, TimestampMixin):
"""
Queue for outbound fulfillment operations to Letzshop.
Supports retry logic for failed operations.
References the unified orders table.
"""
__tablename__ = "letzshop_fulfillment_queue"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
order_id = Column(Integer, ForeignKey("orders.id"), nullable=False, index=True)
# Operation type
operation = Column(
String(50), nullable=False
) # confirm_item, decline_item, set_tracking
# Operation payload
payload = Column(JSON, nullable=False)
# Status and retry
status = Column(
String(50), default="pending"
) # pending, processing, completed, failed
attempts = Column(Integer, default=0)
max_attempts = Column(Integer, default=3)
last_attempt_at = Column(DateTime(timezone=True), nullable=True)
next_retry_at = Column(DateTime(timezone=True), nullable=True)
error_message = Column(Text, nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Response from Letzshop
response_data = Column(JSON, nullable=True)
# Relationships
vendor = relationship("Vendor")
order = relationship("Order")
__table_args__ = (
Index("idx_fulfillment_queue_status", "status", "vendor_id"),
Index("idx_fulfillment_queue_retry", "status", "next_retry_at"),
Index("idx_fulfillment_queue_order", "order_id"),
)
def __repr__(self):
return f"<LetzshopFulfillmentQueue(id={self.id}, order_id={self.order_id}, operation='{self.operation}', status='{self.status}')>"
class LetzshopSyncLog(Base, TimestampMixin):
"""
Audit log for all Letzshop sync operations.
"""
__tablename__ = "letzshop_sync_logs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
# Operation details
operation_type = Column(
String(50), nullable=False
) # order_import, confirm_inventory, set_tracking, etc.
direction = Column(String(10), nullable=False) # inbound, outbound
# Status
status = Column(String(50), nullable=False) # success, failed, partial
# Details
records_processed = Column(Integer, default=0)
records_succeeded = Column(Integer, default=0)
records_failed = Column(Integer, default=0)
error_details = Column(JSON, nullable=True)
# Timestamps
started_at = Column(DateTime(timezone=True), nullable=False)
completed_at = Column(DateTime(timezone=True), nullable=True)
duration_seconds = Column(Integer, nullable=True)
# Triggered by
triggered_by = Column(String(100), nullable=True) # user_id, scheduler, webhook
# Relationships
vendor = relationship("Vendor")
__table_args__ = (
Index("idx_sync_log_vendor_type", "vendor_id", "operation_type"),
Index("idx_sync_log_vendor_date", "vendor_id", "started_at"),
)
def __repr__(self):
return f"<LetzshopSyncLog(id={self.id}, type='{self.operation_type}', status='{self.status}')>"
class LetzshopVendorCache(Base, TimestampMixin):
"""
Cache of Letzshop marketplace vendor directory.
This table stores vendor data fetched from Letzshop's public GraphQL API,
allowing users to browse and claim existing Letzshop shops during signup.
Data is periodically synced from Letzshop (e.g., daily via Celery task).
"""
__tablename__ = "letzshop_vendor_cache"
id = Column(Integer, primary_key=True, index=True)
# Letzshop identifiers
letzshop_id = Column(String(50), unique=True, nullable=False, index=True)
"""Unique ID from Letzshop (e.g., 'lpkedYMRup')."""
slug = Column(String(200), unique=True, nullable=False, index=True)
"""URL slug (e.g., 'nicks-diecast-corner')."""
# Basic info
name = Column(String(255), nullable=False)
"""Vendor display name."""
company_name = Column(String(255), nullable=True)
"""Legal company name."""
is_active = Column(Boolean, default=True)
"""Whether vendor is active on Letzshop."""
# Descriptions (multilingual)
description_en = Column(Text, nullable=True)
description_fr = Column(Text, nullable=True)
description_de = Column(Text, nullable=True)
# Contact information
email = Column(String(255), nullable=True)
phone = Column(String(50), nullable=True)
fax = Column(String(50), nullable=True)
website = Column(String(500), nullable=True)
# Location
street = Column(String(255), nullable=True)
street_number = Column(String(50), nullable=True)
city = Column(String(100), nullable=True)
zipcode = Column(String(20), nullable=True)
country_iso = Column(String(5), default="LU")
latitude = Column(String(20), nullable=True)
longitude = Column(String(20), nullable=True)
# Categories (stored as JSON array of names)
categories = Column(JSON, default=list)
"""List of category names, e.g., ['Fashion', 'Shoes']."""
# Images
background_image_url = Column(String(500), nullable=True)
# Social media (stored as JSON array of URLs)
social_media_links = Column(JSON, default=list)
"""List of social media URLs."""
# Opening hours (multilingual text)
opening_hours_en = Column(Text, nullable=True)
opening_hours_fr = Column(Text, nullable=True)
opening_hours_de = Column(Text, nullable=True)
# Representative
representative_name = Column(String(255), nullable=True)
representative_title = Column(String(100), nullable=True)
# Claiming status (linked to our platform)
claimed_by_vendor_id = Column(
Integer, ForeignKey("vendors.id"), nullable=True, index=True
)
"""If claimed, links to our Vendor record."""
claimed_at = Column(DateTime(timezone=True), nullable=True)
"""When the vendor was claimed on our platform."""
# Sync metadata
last_synced_at = Column(DateTime(timezone=True), nullable=False)
"""When this record was last updated from Letzshop."""
raw_data = Column(JSON, nullable=True)
"""Full raw response from Letzshop API for reference."""
# Relationship to claimed vendor
claimed_vendor = relationship("Vendor", foreign_keys=[claimed_by_vendor_id])
__table_args__ = (
Index("idx_vendor_cache_city", "city"),
Index("idx_vendor_cache_claimed", "claimed_by_vendor_id"),
Index("idx_vendor_cache_active", "is_active"),
)
def __repr__(self):
return f"<LetzshopVendorCache(id={self.id}, slug='{self.slug}', name='{self.name}')>"
@property
def is_claimed(self) -> bool:
"""Check if this vendor has been claimed on our platform."""
return self.claimed_by_vendor_id is not None
@property
def letzshop_url(self) -> str:
"""Get the Letzshop profile URL."""
return f"https://letzshop.lu/vendors/{self.slug}"
def get_description(self, lang: str = "en") -> str | None:
"""Get description in specified language with fallback."""
descriptions = {
"en": self.description_en,
"fr": self.description_fr,
"de": self.description_de,
}
# Try requested language, then fallback order
for try_lang in [lang, "en", "fr", "de"]:
if descriptions.get(try_lang):
return descriptions[try_lang]
return None
def get_opening_hours(self, lang: str = "en") -> str | None:
"""Get opening hours in specified language with fallback."""
hours = {
"en": self.opening_hours_en,
"fr": self.opening_hours_fr,
"de": self.opening_hours_de,
}
for try_lang in [lang, "en", "fr", "de"]:
if hours.get(try_lang):
return hours[try_lang]
return None
def get_full_address(self) -> str | None:
"""Get formatted full address."""
parts = []
if self.street:
addr = self.street
if self.street_number:
addr += f" {self.street_number}"
parts.append(addr)
if self.zipcode or self.city:
parts.append(f"{self.zipcode or ''} {self.city or ''}".strip())
return ", ".join(parts) if parts else None
class LetzshopHistoricalImportJob(Base, TimestampMixin):
"""
Track progress of historical order imports from Letzshop.
Enables real-time progress tracking via polling for long-running imports.
"""
__tablename__ = "letzshop_historical_import_jobs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Status: pending | fetching | processing | completed | failed
status = Column(String(50), default="pending", nullable=False)
# Current phase: "confirmed" | "declined"
current_phase = Column(String(20), nullable=True)
# Fetch progress
current_page = Column(Integer, default=0)
total_pages = Column(Integer, nullable=True) # null = unknown yet
shipments_fetched = Column(Integer, default=0)
# Processing progress
orders_processed = Column(Integer, default=0)
orders_imported = Column(Integer, default=0)
orders_updated = Column(Integer, default=0)
orders_skipped = Column(Integer, default=0)
# EAN matching stats
products_matched = Column(Integer, default=0)
products_not_found = Column(Integer, default=0)
# Phase-specific stats (stored as JSON for combining confirmed + declined)
confirmed_stats = Column(JSON, nullable=True)
declined_stats = Column(JSON, nullable=True)
# Error handling
error_message = Column(Text, nullable=True)
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Timing
started_at = Column(DateTime(timezone=True), nullable=True)
completed_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
vendor = relationship("Vendor")
user = relationship("User")
__table_args__ = (
Index("idx_historical_import_vendor", "vendor_id", "status"),
)
def __repr__(self):
return f"<LetzshopHistoricalImportJob(id={self.id}, status='{self.status}', phase='{self.current_phase}')>"
__all__ = [
"VendorLetzshopCredentials",
"LetzshopFulfillmentQueue",
"LetzshopVendorCache",
"LetzshopSyncLog",
"LetzshopHistoricalImportJob",
]

View File

@@ -1,116 +1,22 @@
from sqlalchemy import JSON, Column, DateTime, ForeignKey, Index, Integer, String, Text
from sqlalchemy.orm import relationship
# models/database/marketplace_import_job.py
"""
Legacy location for marketplace import job models.
from app.core.database import Base
from models.database.base import TimestampMixin
MIGRATED: Models have been moved to app.modules.marketplace.models.marketplace_import_job.
class MarketplaceImportError(Base, TimestampMixin):
"""
Stores detailed information about individual import errors.
Each row that fails during import creates an error record with:
- Row number from the source file
- Identifier (marketplace_product_id if available)
- Error type and message
- Raw row data for review
"""
__tablename__ = "marketplace_import_errors"
id = Column(Integer, primary_key=True, index=True)
import_job_id = Column(
Integer,
ForeignKey("marketplace_import_jobs.id", ondelete="CASCADE"),
nullable=False,
index=True,
New location:
from app.modules.marketplace.models import (
MarketplaceImportJob,
MarketplaceImportError,
)
# Error location
row_number = Column(Integer, nullable=False)
This file re-exports from the new location for backward compatibility.
"""
# Identifier from the row (if available)
identifier = Column(String) # marketplace_product_id, gtin, mpn, etc.
# Re-export from the new canonical location
from app.modules.marketplace.models.marketplace_import_job import (
MarketplaceImportJob,
MarketplaceImportError,
)
# Error details
error_type = Column(
String(50), nullable=False
) # missing_title, missing_id, parse_error, etc.
error_message = Column(Text, nullable=False)
# Raw row data for review (JSON)
row_data = Column(JSON)
# Relationship
import_job = relationship("MarketplaceImportJob", back_populates="errors")
__table_args__ = (
Index("idx_import_error_job_id", "import_job_id"),
Index("idx_import_error_type", "error_type"),
)
def __repr__(self):
return (
f"<MarketplaceImportError(id={self.id}, job_id={self.import_job_id}, "
f"row={self.row_number}, type='{self.error_type}')>"
)
class MarketplaceImportJob(Base, TimestampMixin):
__tablename__ = "marketplace_import_jobs"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
# Import configuration
marketplace = Column(String, nullable=False, index=True, default="Letzshop")
source_url = Column(String, nullable=False)
language = Column(
String(5), nullable=False, default="en"
) # Language for translations
# Status tracking
status = Column(
String, nullable=False, default="pending"
) # pending, processing, completed, failed, completed_with_errors
# Results
imported_count = Column(Integer, default=0)
updated_count = Column(Integer, default=0)
error_count = Column(Integer, default=0)
total_processed = Column(Integer, default=0)
# Error handling
error_message = Column(Text)
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Timestamps
started_at = Column(DateTime(timezone=True))
completed_at = Column(DateTime(timezone=True))
# Relationships
vendor = relationship("Vendor", back_populates="marketplace_import_jobs")
user = relationship("User", foreign_keys=[user_id])
errors = relationship(
"MarketplaceImportError",
back_populates="import_job",
cascade="all, delete-orphan",
order_by="MarketplaceImportError.row_number",
)
# Indexes for performance
__table_args__ = (
Index("idx_import_vendor_status", "vendor_id", "status"),
Index("idx_import_vendor_created", "vendor_id", "created_at"),
Index("idx_import_user_marketplace", "user_id", "marketplace"),
)
def __repr__(self):
return (
f"<MarketplaceImportJob(id={self.id}, vendor_id={self.vendor_id}, "
f"marketplace='{self.marketplace}', status='{self.status}', "
f"imported={self.imported_count})>"
)
__all__ = ["MarketplaceImportJob", "MarketplaceImportError"]

View File

@@ -1,300 +1,28 @@
"""Marketplace Product model for multi-marketplace product integration.
# models/database/marketplace_product.py
"""
Legacy location for marketplace product model.
This model stores canonical product data from various marketplaces (Letzshop,
Amazon, eBay, CodesWholesale, etc.) in a universal format. It supports:
- Physical and digital products
- Multi-language translations (via MarketplaceProductTranslation)
- Flexible attributes for marketplace-specific data
- Google Shopping fields for Letzshop compatibility
MIGRATED: All models have been moved to app.modules.marketplace.models.marketplace_product.
Money values are stored as integer cents (e.g., €105.91 = 10591).
Weight is stored as integer grams (e.g., 1.5kg = 1500g).
See docs/architecture/money-handling.md for details.
New location:
from app.modules.marketplace.models import (
MarketplaceProduct,
ProductType,
DigitalDeliveryMethod,
)
This file re-exports from the new location for backward compatibility.
"""
from enum import Enum
from sqlalchemy import (
Boolean,
Column,
Index,
Integer,
String,
# Re-export everything from the new canonical location
from app.modules.marketplace.models.marketplace_product import (
MarketplaceProduct,
ProductType,
DigitalDeliveryMethod,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from app.utils.money import cents_to_euros, euros_to_cents
from models.database.base import TimestampMixin
class ProductType(str, Enum):
"""Product type classification."""
PHYSICAL = "physical"
DIGITAL = "digital"
SERVICE = "service"
SUBSCRIPTION = "subscription"
class DigitalDeliveryMethod(str, Enum):
"""Digital product delivery methods."""
DOWNLOAD = "download"
EMAIL = "email"
IN_APP = "in_app"
STREAMING = "streaming"
LICENSE_KEY = "license_key"
class MarketplaceProduct(Base, TimestampMixin):
"""Canonical product data from marketplace sources.
This table stores normalized product information from all marketplace sources.
Localized content (title, description) is stored in MarketplaceProductTranslation.
Price fields use integer cents for precision (€19.99 = 1999 cents).
Weight uses integer grams (1.5kg = 1500 grams).
"""
__tablename__ = "marketplace_products"
id = Column(Integer, primary_key=True, index=True)
# === UNIVERSAL IDENTIFIERS ===
marketplace_product_id = Column(String, unique=True, index=True, nullable=False)
gtin = Column(String, index=True) # EAN/UPC - primary cross-marketplace matching
mpn = Column(String, index=True) # Manufacturer Part Number
sku = Column(String, index=True) # Internal SKU if assigned
# === SOURCE TRACKING ===
marketplace = Column(
String, index=True, nullable=True, default="letzshop"
) # 'letzshop', 'amazon', 'ebay', 'codeswholesale'
source_url = Column(String) # Original product URL
vendor_name = Column(String, index=True) # Seller/vendor in marketplace
# === PRODUCT TYPE ===
product_type_enum = Column(
String(20), nullable=False, default=ProductType.PHYSICAL.value
)
is_digital = Column(Boolean, default=False, index=True)
# === DIGITAL PRODUCT FIELDS ===
digital_delivery_method = Column(String(20)) # DigitalDeliveryMethod values
platform = Column(String(50), index=True) # 'steam', 'playstation', 'xbox', etc.
region_restrictions = Column(JSON) # ["EU", "US"] or null for global
license_type = Column(String(50)) # 'single_use', 'subscription', 'lifetime'
# === NON-LOCALIZED FIELDS ===
brand = Column(String, index=True)
google_product_category = Column(String, index=True)
category_path = Column(String) # Normalized category hierarchy
condition = Column(String)
# === PRICING (stored as integer cents) ===
price = Column(String) # Raw price string "19.99 EUR" (kept for reference)
price_cents = Column(Integer) # Parsed numeric price in cents
sale_price = Column(String) # Raw sale price string
sale_price_cents = Column(Integer) # Parsed numeric sale price in cents
currency = Column(String(3), default="EUR")
# === TAX / VAT ===
# Luxembourg VAT rates: 0 (exempt), 3 (super-reduced), 8 (reduced), 14 (intermediate), 17 (standard)
# Prices are stored as gross (VAT-inclusive). Default to standard rate.
tax_rate_percent = Column(Integer, default=17, nullable=False)
# === MEDIA ===
image_link = Column(String)
additional_image_link = Column(String) # Legacy single string
additional_images = Column(JSON) # Array of image URLs
# === PRODUCT ATTRIBUTES (Flexible) ===
attributes = Column(JSON) # {color, size, material, etc.}
# === PHYSICAL PRODUCT FIELDS ===
weight_grams = Column(Integer) # Weight in grams (1.5kg = 1500)
weight_unit = Column(String(10), default="kg") # Display unit
dimensions = Column(JSON) # {length, width, height, unit}
# === GOOGLE SHOPPING FIELDS (Preserved for Letzshop) ===
link = Column(String)
availability = Column(String, index=True)
adult = Column(String)
multipack = Column(Integer)
is_bundle = Column(String)
age_group = Column(String)
color = Column(String)
gender = Column(String)
material = Column(String)
pattern = Column(String)
size = Column(String)
size_type = Column(String)
size_system = Column(String)
item_group_id = Column(String)
product_type_raw = Column(String) # Original feed value (renamed from product_type)
custom_label_0 = Column(String)
custom_label_1 = Column(String)
custom_label_2 = Column(String)
custom_label_3 = Column(String)
custom_label_4 = Column(String)
unit_pricing_measure = Column(String)
unit_pricing_base_measure = Column(String)
identifier_exists = Column(String)
shipping = Column(String)
# === STATUS ===
is_active = Column(Boolean, default=True, index=True)
# === RELATIONSHIPS ===
translations = relationship(
"MarketplaceProductTranslation",
back_populates="marketplace_product",
cascade="all, delete-orphan",
)
vendor_products = relationship("Product", back_populates="marketplace_product")
# === INDEXES ===
__table_args__ = (
Index("idx_marketplace_vendor", "marketplace", "vendor_name"),
Index("idx_marketplace_brand", "marketplace", "brand"),
Index("idx_mp_gtin_marketplace", "gtin", "marketplace"),
Index("idx_mp_product_type", "product_type_enum", "is_digital"),
)
def __repr__(self):
return (
f"<MarketplaceProduct(id={self.id}, "
f"marketplace_product_id='{self.marketplace_product_id}', "
f"marketplace='{self.marketplace}', "
f"vendor='{self.vendor_name}')>"
)
# === PRICE PROPERTIES (Euro convenience accessors) ===
@property
def price_numeric(self) -> float | None:
"""Get price in euros (for API/display). Legacy name for compatibility."""
if self.price_cents is not None:
return cents_to_euros(self.price_cents)
return None
@price_numeric.setter
def price_numeric(self, value: float | None):
"""Set price from euros. Legacy name for compatibility."""
self.price_cents = euros_to_cents(value) if value is not None else None
@property
def sale_price_numeric(self) -> float | None:
"""Get sale price in euros (for API/display). Legacy name for compatibility."""
if self.sale_price_cents is not None:
return cents_to_euros(self.sale_price_cents)
return None
@sale_price_numeric.setter
def sale_price_numeric(self, value: float | None):
"""Set sale price from euros. Legacy name for compatibility."""
self.sale_price_cents = euros_to_cents(value) if value is not None else None
@property
def weight(self) -> float | None:
"""Get weight in kg (for API/display)."""
if self.weight_grams is not None:
return self.weight_grams / 1000.0
return None
@weight.setter
def weight(self, value: float | None):
"""Set weight from kg."""
self.weight_grams = int(value * 1000) if value is not None else None
# === HELPER PROPERTIES ===
@property
def product_type(self) -> ProductType:
"""Get product type as enum."""
return ProductType(self.product_type_enum)
@product_type.setter
def product_type(self, value: ProductType | str):
"""Set product type from enum or string."""
if isinstance(value, ProductType):
self.product_type_enum = value.value
else:
self.product_type_enum = value
@property
def delivery_method(self) -> DigitalDeliveryMethod | None:
"""Get digital delivery method as enum."""
if self.digital_delivery_method:
return DigitalDeliveryMethod(self.digital_delivery_method)
return None
@delivery_method.setter
def delivery_method(self, value: DigitalDeliveryMethod | str | None):
"""Set delivery method from enum or string."""
if value is None:
self.digital_delivery_method = None
elif isinstance(value, DigitalDeliveryMethod):
self.digital_delivery_method = value.value
else:
self.digital_delivery_method = value
def get_translation(self, language: str) -> "MarketplaceProductTranslation | None":
"""Get translation for a specific language."""
for t in self.translations:
if t.language == language:
return t
return None
def get_title(self, language: str = "en") -> str | None:
"""Get title for a specific language with fallback to 'en'."""
translation = self.get_translation(language)
if translation:
return translation.title
# Fallback to English
if language != "en":
en_translation = self.get_translation("en")
if en_translation:
return en_translation.title
return None
def get_description(self, language: str = "en") -> str | None:
"""Get description for a specific language with fallback to 'en'."""
translation = self.get_translation(language)
if translation:
return translation.description
# Fallback to English
if language != "en":
en_translation = self.get_translation("en")
if en_translation:
return en_translation.description
return None
@property
def effective_price(self) -> float | None:
"""Get the effective numeric price in euros."""
return self.price_numeric
@property
def effective_sale_price(self) -> float | None:
"""Get the effective numeric sale price in euros."""
return self.sale_price_numeric
@property
def all_images(self) -> list[str]:
"""Get all product images as a list."""
images = []
if self.image_link:
images.append(self.image_link)
if self.additional_images:
images.extend(self.additional_images)
elif self.additional_image_link:
# Legacy single string format
images.append(self.additional_image_link)
return images
__all__ = [
"MarketplaceProduct",
"ProductType",
"DigitalDeliveryMethod",
]

View File

@@ -1,76 +1,18 @@
"""Marketplace Product Translation model for multi-language support.
# models/database/marketplace_product_translation.py
"""
Legacy location for marketplace product translation model.
This model stores localized content (title, description, SEO fields) for
marketplace products. Each marketplace product can have multiple translations
for different languages.
MIGRATED: Model has been moved to app.modules.marketplace.models.marketplace_product_translation.
New location:
from app.modules.marketplace.models import MarketplaceProductTranslation
This file re-exports from the new location for backward compatibility.
"""
from sqlalchemy import (
Column,
ForeignKey,
Index,
Integer,
String,
Text,
UniqueConstraint,
# Re-export from the new canonical location
from app.modules.marketplace.models.marketplace_product_translation import (
MarketplaceProductTranslation,
)
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class MarketplaceProductTranslation(Base, TimestampMixin):
"""Localized content for marketplace products.
Stores translations for product titles, descriptions, and SEO fields.
Each marketplace_product can have one translation per language.
"""
__tablename__ = "marketplace_product_translations"
id = Column(Integer, primary_key=True, index=True)
marketplace_product_id = Column(
Integer,
ForeignKey("marketplace_products.id", ondelete="CASCADE"),
nullable=False,
)
language = Column(String(5), nullable=False) # 'en', 'fr', 'de', 'lb'
# === LOCALIZED CONTENT ===
title = Column(String, nullable=False)
description = Column(Text)
short_description = Column(String(500))
# === SEO FIELDS ===
meta_title = Column(String(70))
meta_description = Column(String(160))
url_slug = Column(String(255))
# === SOURCE TRACKING ===
source_import_id = Column(Integer) # Which import job provided this
source_file = Column(String) # e.g., "letzshop_fr.csv"
# === RELATIONSHIPS ===
marketplace_product = relationship(
"MarketplaceProduct",
back_populates="translations",
)
__table_args__ = (
UniqueConstraint(
"marketplace_product_id",
"language",
name="uq_marketplace_product_translation",
),
Index("idx_mpt_marketplace_product_id", "marketplace_product_id"),
Index("idx_mpt_language", "language"),
)
def __repr__(self):
return (
f"<MarketplaceProductTranslation(id={self.id}, "
f"marketplace_product_id={self.marketplace_product_id}, "
f"language='{self.language}', "
f"title='{self.title[:30] if self.title else None}...')>"
)
__all__ = ["MarketplaceProductTranslation"]

View File

@@ -1,756 +1,53 @@
# models/database/subscription.py
"""
Subscription database models for tier-based access control.
Legacy location for subscription models.
Provides models for:
- SubscriptionTier: Database-driven tier definitions with Stripe integration
- VendorSubscription: Per-vendor subscription tracking
- AddOnProduct: Purchasable add-ons (domains, SSL, email packages)
- VendorAddOn: Add-ons purchased by each vendor
- StripeWebhookEvent: Idempotency tracking for webhook processing
- BillingHistory: Invoice and payment history
MIGRATED: All models have been moved to app.modules.billing.models.subscription.
Tier Structure:
- Essential (€49/mo): 100 orders/mo, 200 products, 1 user, LU invoicing
- Professional (€99/mo): 500 orders/mo, unlimited products, 3 users, EU VAT
- Business (€199/mo): 2000 orders/mo, unlimited products, 10 users, analytics, API
- Enterprise (€399+/mo): Unlimited, white-label, custom integrations
New location:
from app.modules.billing.models import (
VendorSubscription,
SubscriptionTier,
TierCode,
SubscriptionStatus,
)
This file re-exports from the new location for backward compatibility.
"""
import enum
from datetime import UTC, datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
Numeric,
String,
Text,
# Re-export everything from the new canonical location
from app.modules.billing.models.subscription import (
# Enums
TierCode,
SubscriptionStatus,
AddOnCategory,
BillingPeriod,
# Models
SubscriptionTier,
AddOnProduct,
VendorAddOn,
StripeWebhookEvent,
BillingHistory,
VendorSubscription,
CapacitySnapshot,
# Legacy constants
TIER_LIMITS,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class TierCode(str, enum.Enum):
"""Subscription tier codes."""
ESSENTIAL = "essential"
PROFESSIONAL = "professional"
BUSINESS = "business"
ENTERPRISE = "enterprise"
class SubscriptionStatus(str, enum.Enum):
"""Subscription status."""
TRIAL = "trial" # Free trial period
ACTIVE = "active" # Paid and active
PAST_DUE = "past_due" # Payment failed, grace period
CANCELLED = "cancelled" # Cancelled, access until period end
EXPIRED = "expired" # No longer active
class AddOnCategory(str, enum.Enum):
"""Add-on product categories."""
DOMAIN = "domain"
SSL = "ssl"
EMAIL = "email"
STORAGE = "storage"
class BillingPeriod(str, enum.Enum):
"""Billing period for add-ons."""
MONTHLY = "monthly"
ANNUAL = "annual"
ONE_TIME = "one_time"
# ============================================================================
# SubscriptionTier - Database-driven tier definitions
# ============================================================================
class SubscriptionTier(Base, TimestampMixin):
"""
Database-driven tier definitions with Stripe integration.
Replaces the hardcoded TIER_LIMITS dict for dynamic tier management.
Can be:
- Global tier (platform_id=NULL): Available to all platforms
- Platform-specific tier (platform_id set): Only for that platform
"""
__tablename__ = "subscription_tiers"
id = Column(Integer, primary_key=True, index=True)
# Platform association (NULL = global tier available to all platforms)
platform_id = Column(
Integer,
ForeignKey("platforms.id", ondelete="CASCADE"),
nullable=True,
index=True,
comment="Platform this tier belongs to (NULL = global tier)",
)
code = Column(String(30), nullable=False, index=True)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
# Pricing (in cents for precision)
price_monthly_cents = Column(Integer, nullable=False)
price_annual_cents = Column(Integer, nullable=True) # Null for enterprise/custom
# Limits (null = unlimited)
orders_per_month = Column(Integer, nullable=True)
products_limit = Column(Integer, nullable=True)
team_members = Column(Integer, nullable=True)
order_history_months = Column(Integer, nullable=True)
# CMS Limits (null = unlimited)
cms_pages_limit = Column(
Integer,
nullable=True,
comment="Total CMS pages limit (NULL = unlimited)",
)
cms_custom_pages_limit = Column(
Integer,
nullable=True,
comment="Custom pages limit, excluding overrides (NULL = unlimited)",
)
# Features (JSON array of feature codes)
features = Column(JSON, default=list)
# Stripe Product/Price IDs
stripe_product_id = Column(String(100), nullable=True)
stripe_price_monthly_id = Column(String(100), nullable=True)
stripe_price_annual_id = Column(String(100), nullable=True)
# Display and visibility
display_order = Column(Integer, default=0)
is_active = Column(Boolean, default=True, nullable=False)
is_public = Column(Boolean, default=True, nullable=False) # False for enterprise
# Relationship to Platform
platform = relationship(
"Platform",
back_populates="subscription_tiers",
foreign_keys=[platform_id],
)
# Unique constraint: tier code must be unique per platform (or globally if NULL)
__table_args__ = (
Index("idx_tier_platform_active", "platform_id", "is_active"),
)
def __repr__(self):
platform_info = f", platform_id={self.platform_id}" if self.platform_id else ""
return f"<SubscriptionTier(code='{self.code}', name='{self.name}'{platform_info})>"
def to_dict(self) -> dict:
"""Convert tier to dictionary (compatible with TIER_LIMITS format)."""
return {
"name": self.name,
"price_monthly_cents": self.price_monthly_cents,
"price_annual_cents": self.price_annual_cents,
"orders_per_month": self.orders_per_month,
"products_limit": self.products_limit,
"team_members": self.team_members,
"order_history_months": self.order_history_months,
"cms_pages_limit": self.cms_pages_limit,
"cms_custom_pages_limit": self.cms_custom_pages_limit,
"features": self.features or [],
}
# ============================================================================
# AddOnProduct - Purchasable add-ons
# ============================================================================
class AddOnProduct(Base, TimestampMixin):
"""
Purchasable add-on products (domains, SSL, email packages).
These are separate from subscription tiers and can be added to any tier.
"""
__tablename__ = "addon_products"
id = Column(Integer, primary_key=True, index=True)
code = Column(String(50), unique=True, nullable=False, index=True)
name = Column(String(100), nullable=False)
description = Column(Text, nullable=True)
category = Column(String(50), nullable=False, index=True)
# Pricing
price_cents = Column(Integer, nullable=False)
billing_period = Column(
String(20), default=BillingPeriod.MONTHLY.value, nullable=False
)
# For tiered add-ons (e.g., email_5, email_10)
quantity_unit = Column(String(50), nullable=True) # emails, GB, etc.
quantity_value = Column(Integer, nullable=True) # 5, 10, 50, etc.
# Stripe
stripe_product_id = Column(String(100), nullable=True)
stripe_price_id = Column(String(100), nullable=True)
# Display
display_order = Column(Integer, default=0)
is_active = Column(Boolean, default=True, nullable=False)
def __repr__(self):
return f"<AddOnProduct(code='{self.code}', name='{self.name}')>"
# ============================================================================
# VendorAddOn - Add-ons purchased by vendor
# ============================================================================
class VendorAddOn(Base, TimestampMixin):
"""
Add-ons purchased by a vendor.
Tracks active add-on subscriptions and their billing status.
"""
__tablename__ = "vendor_addons"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
addon_product_id = Column(
Integer, ForeignKey("addon_products.id"), nullable=False, index=True
)
# Status
status = Column(String(20), default="active", nullable=False, index=True)
# For domains: store the actual domain name
domain_name = Column(String(255), nullable=True, index=True)
# Quantity (for tiered add-ons like email packages)
quantity = Column(Integer, default=1, nullable=False)
# Stripe billing
stripe_subscription_item_id = Column(String(100), nullable=True)
# Period tracking
period_start = Column(DateTime(timezone=True), nullable=True)
period_end = Column(DateTime(timezone=True), nullable=True)
# Cancellation
cancelled_at = Column(DateTime(timezone=True), nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="addons")
addon_product = relationship("AddOnProduct")
__table_args__ = (
Index("idx_vendor_addon_status", "vendor_id", "status"),
Index("idx_vendor_addon_product", "vendor_id", "addon_product_id"),
)
def __repr__(self):
return f"<VendorAddOn(vendor_id={self.vendor_id}, addon={self.addon_product_id}, status='{self.status}')>"
# ============================================================================
# StripeWebhookEvent - Webhook idempotency tracking
# ============================================================================
class StripeWebhookEvent(Base, TimestampMixin):
"""
Log of processed Stripe webhook events for idempotency.
Prevents duplicate processing of the same event.
"""
__tablename__ = "stripe_webhook_events"
id = Column(Integer, primary_key=True, index=True)
event_id = Column(String(100), unique=True, nullable=False, index=True)
event_type = Column(String(100), nullable=False, index=True)
# Processing status
status = Column(String(20), default="pending", nullable=False, index=True)
processed_at = Column(DateTime(timezone=True), nullable=True)
error_message = Column(Text, nullable=True)
# Raw event data (encrypted for security)
payload_encrypted = Column(Text, nullable=True)
# Related entities (for quick lookup)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=True, index=True)
subscription_id = Column(
Integer, ForeignKey("vendor_subscriptions.id"), nullable=True, index=True
)
__table_args__ = (Index("idx_webhook_event_type_status", "event_type", "status"),)
def __repr__(self):
return f"<StripeWebhookEvent(event_id='{self.event_id}', type='{self.event_type}', status='{self.status}')>"
# ============================================================================
# BillingHistory - Invoice and payment history
# ============================================================================
class BillingHistory(Base, TimestampMixin):
"""
Invoice and payment history for vendors.
Stores Stripe invoice data for display and reporting.
"""
__tablename__ = "billing_history"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(Integer, ForeignKey("vendors.id"), nullable=False, index=True)
# Stripe references
stripe_invoice_id = Column(String(100), unique=True, nullable=True, index=True)
stripe_payment_intent_id = Column(String(100), nullable=True)
# Invoice details
invoice_number = Column(String(50), nullable=True)
invoice_date = Column(DateTime(timezone=True), nullable=False)
due_date = Column(DateTime(timezone=True), nullable=True)
# Amounts (in cents for precision)
subtotal_cents = Column(Integer, nullable=False)
tax_cents = Column(Integer, default=0, nullable=False)
total_cents = Column(Integer, nullable=False)
amount_paid_cents = Column(Integer, default=0, nullable=False)
currency = Column(String(3), default="EUR", nullable=False)
# Status
status = Column(String(20), nullable=False, index=True)
# PDF URLs
invoice_pdf_url = Column(String(500), nullable=True)
hosted_invoice_url = Column(String(500), nullable=True)
# Description and line items
description = Column(Text, nullable=True)
line_items = Column(JSON, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="billing_history")
__table_args__ = (
Index("idx_billing_vendor_date", "vendor_id", "invoice_date"),
Index("idx_billing_status", "vendor_id", "status"),
)
def __repr__(self):
return f"<BillingHistory(vendor_id={self.vendor_id}, invoice='{self.invoice_number}', status='{self.status}')>"
# ============================================================================
# Legacy TIER_LIMITS (kept for backward compatibility during migration)
# ============================================================================
# Tier limit definitions (hardcoded for now, could be moved to DB)
TIER_LIMITS = {
TierCode.ESSENTIAL: {
"name": "Essential",
"price_monthly_cents": 4900, # €49
"price_annual_cents": 49000, # €490 (2 months free)
"orders_per_month": 100,
"products_limit": 200,
"team_members": 1,
"order_history_months": 6,
"features": [
"letzshop_sync",
"inventory_basic",
"invoice_lu",
"customer_view",
],
},
TierCode.PROFESSIONAL: {
"name": "Professional",
"price_monthly_cents": 9900, # €99
"price_annual_cents": 99000, # €990
"orders_per_month": 500,
"products_limit": None, # Unlimited
"team_members": 3,
"order_history_months": 24,
"features": [
"letzshop_sync",
"inventory_locations",
"inventory_purchase_orders",
"invoice_lu",
"invoice_eu_vat",
"customer_view",
"customer_export",
],
},
TierCode.BUSINESS: {
"name": "Business",
"price_monthly_cents": 19900, # €199
"price_annual_cents": 199000, # €1990
"orders_per_month": 2000,
"products_limit": None, # Unlimited
"team_members": 10,
"order_history_months": None, # Unlimited
"features": [
"letzshop_sync",
"inventory_locations",
"inventory_purchase_orders",
"invoice_lu",
"invoice_eu_vat",
"invoice_bulk",
"customer_view",
"customer_export",
"analytics_dashboard",
"accounting_export",
"api_access",
"automation_rules",
"team_roles",
],
},
TierCode.ENTERPRISE: {
"name": "Enterprise",
"price_monthly_cents": 39900, # €399 starting
"price_annual_cents": None, # Custom
"orders_per_month": None, # Unlimited
"products_limit": None, # Unlimited
"team_members": None, # Unlimited
"order_history_months": None, # Unlimited
"features": [
"letzshop_sync",
"inventory_locations",
"inventory_purchase_orders",
"invoice_lu",
"invoice_eu_vat",
"invoice_bulk",
"customer_view",
"customer_export",
"analytics_dashboard",
"accounting_export",
"api_access",
"automation_rules",
"team_roles",
"white_label",
"multi_vendor",
"custom_integrations",
"sla_guarantee",
"dedicated_support",
],
},
}
class VendorSubscription(Base, TimestampMixin):
"""
Per-vendor subscription tracking.
Tracks the vendor's subscription tier, billing period,
and usage counters for limit enforcement.
"""
__tablename__ = "vendor_subscriptions"
id = Column(Integer, primary_key=True, index=True)
vendor_id = Column(
Integer, ForeignKey("vendors.id"), unique=True, nullable=False, index=True
)
# Tier - tier_id is the FK, tier (code) kept for backwards compatibility
tier_id = Column(
Integer, ForeignKey("subscription_tiers.id"), nullable=True, index=True
)
tier = Column(
String(20), default=TierCode.ESSENTIAL.value, nullable=False, index=True
)
# Status
status = Column(
String(20), default=SubscriptionStatus.TRIAL.value, nullable=False, index=True
)
# Billing period
period_start = Column(DateTime(timezone=True), nullable=False)
period_end = Column(DateTime(timezone=True), nullable=False)
is_annual = Column(Boolean, default=False, nullable=False)
# Trial info
trial_ends_at = Column(DateTime(timezone=True), nullable=True)
# Card collection tracking (for trials that require card upfront)
card_collected_at = Column(DateTime(timezone=True), nullable=True)
# Usage counters (reset each billing period)
orders_this_period = Column(Integer, default=0, nullable=False)
orders_limit_reached_at = Column(DateTime(timezone=True), nullable=True)
# Overrides (for custom enterprise deals)
custom_orders_limit = Column(Integer, nullable=True) # Override tier limit
custom_products_limit = Column(Integer, nullable=True)
custom_team_limit = Column(Integer, nullable=True)
# Payment info (Stripe integration)
stripe_customer_id = Column(String(100), nullable=True, index=True)
stripe_subscription_id = Column(String(100), nullable=True, index=True)
stripe_price_id = Column(String(100), nullable=True) # Current price being billed
stripe_payment_method_id = Column(String(100), nullable=True) # Default payment method
# Proration and upgrade/downgrade tracking
proration_behavior = Column(String(50), default="create_prorations")
scheduled_tier_change = Column(String(30), nullable=True) # Pending tier change
scheduled_change_at = Column(DateTime(timezone=True), nullable=True)
# Payment failure tracking
payment_retry_count = Column(Integer, default=0, nullable=False)
last_payment_error = Column(Text, nullable=True)
# Cancellation
cancelled_at = Column(DateTime(timezone=True), nullable=True)
cancellation_reason = Column(Text, nullable=True)
# Relationships
vendor = relationship("Vendor", back_populates="subscription")
tier_obj = relationship("SubscriptionTier", backref="subscriptions")
__table_args__ = (
Index("idx_subscription_vendor_status", "vendor_id", "status"),
Index("idx_subscription_period", "period_start", "period_end"),
)
def __repr__(self):
return f"<VendorSubscription(vendor_id={self.vendor_id}, tier='{self.tier}', status='{self.status}')>"
# =========================================================================
# Tier Limit Properties
# =========================================================================
@property
def tier_limits(self) -> dict:
"""Get the limit definitions for current tier.
Uses database tier (tier_obj) if available, otherwise falls back
to hardcoded TIER_LIMITS for backwards compatibility.
"""
# Use database tier if relationship is loaded
if self.tier_obj is not None:
return {
"orders_per_month": self.tier_obj.orders_per_month,
"products_limit": self.tier_obj.products_limit,
"team_members": self.tier_obj.team_members,
"features": self.tier_obj.features or [],
}
# Fall back to hardcoded limits
return TIER_LIMITS.get(TierCode(self.tier), TIER_LIMITS[TierCode.ESSENTIAL])
@property
def orders_limit(self) -> int | None:
"""Get effective orders limit (custom or tier default)."""
if self.custom_orders_limit is not None:
return self.custom_orders_limit
return self.tier_limits.get("orders_per_month")
@property
def products_limit(self) -> int | None:
"""Get effective products limit (custom or tier default)."""
if self.custom_products_limit is not None:
return self.custom_products_limit
return self.tier_limits.get("products_limit")
@property
def team_members_limit(self) -> int | None:
"""Get effective team members limit (custom or tier default)."""
if self.custom_team_limit is not None:
return self.custom_team_limit
return self.tier_limits.get("team_members")
@property
def features(self) -> list[str]:
"""Get list of enabled features for current tier."""
return self.tier_limits.get("features", [])
# =========================================================================
# Status Checks
# =========================================================================
@property
def is_active(self) -> bool:
"""Check if subscription allows access."""
return self.status in [
SubscriptionStatus.TRIAL.value,
SubscriptionStatus.ACTIVE.value,
SubscriptionStatus.PAST_DUE.value, # Grace period
SubscriptionStatus.CANCELLED.value, # Until period end
]
@property
def is_trial(self) -> bool:
"""Check if currently in trial."""
return self.status == SubscriptionStatus.TRIAL.value
@property
def trial_days_remaining(self) -> int | None:
"""Get remaining trial days."""
if not self.is_trial or not self.trial_ends_at:
return None
remaining = (self.trial_ends_at - datetime.now(UTC)).days
return max(0, remaining)
# =========================================================================
# Limit Checks
# =========================================================================
def can_create_order(self) -> tuple[bool, str | None]:
"""
Check if vendor can create/import another order.
Returns: (can_create, error_message)
"""
if not self.is_active:
return False, "Subscription is not active"
limit = self.orders_limit
if limit is None: # Unlimited
return True, None
if self.orders_this_period >= limit:
return False, f"Monthly order limit reached ({limit} orders). Upgrade to continue."
return True, None
def can_add_product(self, current_count: int) -> tuple[bool, str | None]:
"""
Check if vendor can add another product.
Args:
current_count: Current number of products
Returns: (can_add, error_message)
"""
if not self.is_active:
return False, "Subscription is not active"
limit = self.products_limit
if limit is None: # Unlimited
return True, None
if current_count >= limit:
return False, f"Product limit reached ({limit} products). Upgrade to add more."
return True, None
def can_add_team_member(self, current_count: int) -> tuple[bool, str | None]:
"""
Check if vendor can add another team member.
Args:
current_count: Current number of team members
Returns: (can_add, error_message)
"""
if not self.is_active:
return False, "Subscription is not active"
limit = self.team_members_limit
if limit is None: # Unlimited
return True, None
if current_count >= limit:
return False, f"Team member limit reached ({limit} members). Upgrade to add more."
return True, None
def has_feature(self, feature: str) -> bool:
"""Check if a feature is enabled for current tier."""
return feature in self.features
# =========================================================================
# Usage Tracking
# =========================================================================
def increment_order_count(self) -> None:
"""Increment the order counter for this period."""
self.orders_this_period += 1
# Track when limit was first reached
limit = self.orders_limit
if limit and self.orders_this_period >= limit and not self.orders_limit_reached_at:
self.orders_limit_reached_at = datetime.now(UTC)
def reset_period_counters(self) -> None:
"""Reset counters for new billing period."""
self.orders_this_period = 0
self.orders_limit_reached_at = None
# ============================================================================
# Capacity Planning
# ============================================================================
class CapacitySnapshot(Base, TimestampMixin):
"""
Daily snapshot of platform capacity metrics.
Used for growth trending and capacity forecasting.
Captured daily by background job.
"""
__tablename__ = "capacity_snapshots"
id = Column(Integer, primary_key=True, index=True)
snapshot_date = Column(DateTime(timezone=True), nullable=False, unique=True, index=True)
# Vendor metrics
total_vendors = Column(Integer, default=0, nullable=False)
active_vendors = Column(Integer, default=0, nullable=False)
trial_vendors = Column(Integer, default=0, nullable=False)
# Subscription metrics
total_subscriptions = Column(Integer, default=0, nullable=False)
active_subscriptions = Column(Integer, default=0, nullable=False)
# Resource metrics
total_products = Column(Integer, default=0, nullable=False)
total_orders_month = Column(Integer, default=0, nullable=False)
total_team_members = Column(Integer, default=0, nullable=False)
# Storage metrics
storage_used_gb = Column(Numeric(10, 2), default=0, nullable=False)
db_size_mb = Column(Numeric(10, 2), default=0, nullable=False)
# Capacity metrics (theoretical limits from subscriptions)
theoretical_products_limit = Column(Integer, nullable=True)
theoretical_orders_limit = Column(Integer, nullable=True)
theoretical_team_limit = Column(Integer, nullable=True)
# Tier distribution (JSON: {"essential": 10, "professional": 5, ...})
tier_distribution = Column(JSON, nullable=True)
# Performance metrics
avg_response_ms = Column(Integer, nullable=True)
peak_cpu_percent = Column(Numeric(5, 2), nullable=True)
peak_memory_percent = Column(Numeric(5, 2), nullable=True)
# Indexes
__table_args__ = (
Index("ix_capacity_snapshots_date", "snapshot_date"),
)
def __repr__(self) -> str:
return f"<CapacitySnapshot(date={self.snapshot_date}, vendors={self.total_vendors})>"
__all__ = [
# Enums
"TierCode",
"SubscriptionStatus",
"AddOnCategory",
"BillingPeriod",
# Models
"SubscriptionTier",
"AddOnProduct",
"VendorAddOn",
"StripeWebhookEvent",
"BillingHistory",
"VendorSubscription",
"CapacitySnapshot",
# Legacy constants
"TIER_LIMITS",
]

View File

@@ -1,147 +1,23 @@
# models/database/test_run.py
"""
Test Run Models
Database models for tracking pytest test runs and results
Test Run Models - LEGACY LOCATION
This file exists for backward compatibility.
The canonical location is now: app/modules/dev_tools/models/test_run.py
All imports should use the new location:
from app.modules.dev_tools.models import TestRun, TestResult, TestCollection
"""
from sqlalchemy import (
JSON,
Column,
DateTime,
Float,
ForeignKey,
Integer,
String,
Text,
# Re-export from canonical location for backward compatibility
from app.modules.dev_tools.models.test_run import (
TestRun,
TestResult,
TestCollection,
)
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
from app.core.database import Base
class TestRun(Base):
"""Represents a single pytest run"""
__tablename__ = "test_runs"
id = Column(Integer, primary_key=True, index=True)
timestamp = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False, index=True
)
# Test counts
total_tests = Column(Integer, default=0)
passed = Column(Integer, default=0)
failed = Column(Integer, default=0)
errors = Column(Integer, default=0)
skipped = Column(Integer, default=0)
xfailed = Column(Integer, default=0) # Expected failures
xpassed = Column(Integer, default=0) # Unexpected passes
# Coverage info (optional)
coverage_percent = Column(Float, nullable=True)
# Timing
duration_seconds = Column(Float, default=0.0)
# Run metadata
triggered_by = Column(String(100)) # 'manual', 'scheduled', 'ci/cd'
git_commit_hash = Column(String(40))
git_branch = Column(String(100))
test_path = Column(String(500)) # Which tests were run (e.g., 'tests/unit')
pytest_args = Column(String(500)) # Command line arguments used
# Status
status = Column(
String(20), default="running", index=True
) # 'running', 'passed', 'failed', 'error'
# Celery task tracking (optional - for USE_CELERY=true)
celery_task_id = Column(String(255), nullable=True, index=True)
# Relationship to test results
results = relationship(
"TestResult", back_populates="run", cascade="all, delete-orphan"
)
def __repr__(self):
return f"<TestRun(id={self.id}, total={self.total_tests}, passed={self.passed}, failed={self.failed})>"
@property
def pass_rate(self) -> float:
"""Calculate pass rate as percentage"""
if self.total_tests == 0:
return 0.0
return (self.passed / self.total_tests) * 100
class TestResult(Base):
"""Represents a single test result from a pytest run"""
__tablename__ = "test_results"
id = Column(Integer, primary_key=True, index=True)
run_id = Column(Integer, ForeignKey("test_runs.id"), nullable=False, index=True)
# Test identification
node_id = Column(
String(500), nullable=False, index=True
) # e.g., 'tests/unit/test_foo.py::test_bar'
test_name = Column(String(200), nullable=False) # e.g., 'test_bar'
test_file = Column(String(300), nullable=False) # e.g., 'tests/unit/test_foo.py'
test_class = Column(String(200)) # e.g., 'TestFooClass' (optional)
# Result
outcome = Column(
String(20), nullable=False, index=True
) # 'passed', 'failed', 'error', 'skipped', 'xfailed', 'xpassed'
duration_seconds = Column(Float, default=0.0)
# Failure details (if applicable)
error_message = Column(Text)
traceback = Column(Text)
# Test metadata
markers = Column(JSON) # List of pytest markers
parameters = Column(JSON) # Parametrized test params
# Timestamps
created_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
# Relationships
run = relationship("TestRun", back_populates="results")
def __repr__(self):
return f"<TestResult(id={self.id}, node_id={self.node_id}, outcome={self.outcome})>"
class TestCollection(Base):
"""Cached test collection info for quick stats"""
__tablename__ = "test_collections"
id = Column(Integer, primary_key=True, index=True)
# Collection stats
total_tests = Column(Integer, default=0)
total_files = Column(Integer, default=0)
total_classes = Column(Integer, default=0)
# By category
unit_tests = Column(Integer, default=0)
integration_tests = Column(Integer, default=0)
performance_tests = Column(Integer, default=0)
system_tests = Column(Integer, default=0)
# Collection data
test_files = Column(JSON) # List of test files with counts
# Timestamps
collected_at = Column(
DateTime(timezone=True), server_default=func.now(), nullable=False
)
def __repr__(self):
return f"<TestCollection(id={self.id}, total={self.total_tests})>"
__all__ = [
"TestRun",
"TestResult",
"TestCollection",
]