Files
orion/scripts/seed_database.py

1350 lines
41 KiB
Python
Raw Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Enhanced Database Seeder for Wizamart Platform
Creates comprehensive test data including:
- Admin and regular users
- Multiple vendors with themes and domains
- Products and marketplace products
- Customers and addresses
- Orders and order items
- Inventory records
- Import jobs
- Admin settings and platform alerts
Usage:
python scripts/seed_database.py [--reset] [--minimal]
--reset : Drop all data before seeding (destructive!)
--minimal : Create only essential data (admin + 1 vendor)
Or via Makefile:
make seed # Normal seeding
make seed-reset # Reset and seed
make seed-minimal # Minimal seeding
This script is idempotent when run without --reset.
"""
import sys
import argparse
from pathlib import Path
from typing import Dict, List, Tuple
from datetime import datetime, timezone, timedelta
from decimal import Decimal
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy.orm import Session
from sqlalchemy import select, delete
from app.core.database import SessionLocal, engine
from models.database.user import User
from models.database.vendor import Vendor, VendorUser, Role
from models.database.vendor_domain import VendorDomain
from models.database.vendor_theme import VendorTheme
from models.database.customer import Customer, CustomerAddress
from models.database.product import Product
from models.database.marketplace_product import MarketplaceProduct
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.order import Order, OrderItem
from models.database.admin import AdminSetting, PlatformAlert
from middleware.auth import AuthManager
# =============================================================================
# CONFIGURATION
# =============================================================================
DEFAULT_ADMIN_EMAIL = "admin@wizamart.com"
DEFAULT_ADMIN_USERNAME = "admin"
DEFAULT_ADMIN_PASSWORD = "admin123" # Change in production!
VENDOR_CONFIGS = [
{
"vendor_code": "WIZAMART",
"name": "WizaMart",
"subdomain": "wizamart",
"description": "Premium electronics and gadgets marketplace",
"theme_preset": "modern",
"custom_domain": "wizamart.shop",
},
{
"vendor_code": "FASHIONHUB",
"name": "Fashion Hub",
"subdomain": "fashionhub",
"description": "Trendy clothing and accessories",
"theme_preset": "vibrant",
"custom_domain": "fashionhub.store",
},
{
"vendor_code": "BOOKSTORE",
"name": "The Book Store",
"subdomain": "bookstore",
"description": "Books, magazines, and educational materials",
"theme_preset": "classic",
"custom_domain": None,
},
]
TEST_USERS = [
{
"username": "vendor1",
"email": "vendor1@example.com",
"password": "password123",
"role": "vendor",
"first_name": "John",
"last_name": "Vendor",
},
{
"username": "vendor2",
"email": "vendor2@example.com",
"password": "password123",
"role": "vendor",
"first_name": "Jane",
"last_name": "Merchant",
},
{
"username": "customer1",
"email": "customer1@example.com",
"password": "password123",
"role": "customer",
"first_name": "Alice",
"last_name": "Customer",
},
]
THEME_PRESETS = {
"default": {
"theme_name": "default",
"colors": {
"primary": "#6366f1",
"secondary": "#8b5cf6",
"accent": "#ec4899",
"background": "#ffffff",
"text": "#1f2937",
"border": "#e5e7eb"
},
"font_family_heading": "Inter, sans-serif",
"font_family_body": "Inter, sans-serif",
"layout_style": "grid",
"header_style": "fixed",
"product_card_style": "modern",
},
"modern": {
"theme_name": "modern",
"colors": {
"primary": "#3b82f6",
"secondary": "#06b6d4",
"accent": "#f59e0b",
"background": "#f9fafb",
"text": "#111827",
"border": "#d1d5db"
},
"font_family_heading": "Poppins, sans-serif",
"font_family_body": "Inter, sans-serif",
"layout_style": "grid",
"header_style": "transparent",
"product_card_style": "modern",
},
"classic": {
"theme_name": "classic",
"colors": {
"primary": "#1e40af",
"secondary": "#7c3aed",
"accent": "#dc2626",
"background": "#ffffff",
"text": "#374151",
"border": "#e5e7eb"
},
"font_family_heading": "Georgia, serif",
"font_family_body": "Georgia, serif",
"layout_style": "list",
"header_style": "fixed",
"product_card_style": "classic",
},
"vibrant": {
"theme_name": "vibrant",
"colors": {
"primary": "#ec4899",
"secondary": "#f59e0b",
"accent": "#8b5cf6",
"background": "#fef3c7",
"text": "#78350f",
"border": "#fbbf24"
},
"font_family_heading": "Montserrat, sans-serif",
"font_family_body": "Open Sans, sans-serif",
"layout_style": "masonry",
"header_style": "static",
"product_card_style": "modern",
},
"minimal": {
"theme_name": "minimal",
"colors": {
"primary": "#000000",
"secondary": "#404040",
"accent": "#737373",
"background": "#ffffff",
"text": "#171717",
"border": "#e5e5e5"
},
"font_family_heading": "Helvetica, Arial, sans-serif",
"font_family_body": "Helvetica, Arial, sans-serif",
"layout_style": "grid",
"header_style": "static",
"product_card_style": "minimal",
},
}
SAMPLE_PRODUCTS = [
{
"marketplace_product_id": "PROD-001",
"title": "Wireless Bluetooth Headphones",
"description": "Premium noise-cancelling wireless headphones with 30-hour battery life",
"price": "149.99",
"brand": "AudioTech",
"gtin": "1234567890123",
"availability": "in stock",
"condition": "new",
"google_product_category": "Electronics > Audio > Headphones",
},
{
"marketplace_product_id": "PROD-002",
"title": "Smart Watch Pro",
"description": "Advanced fitness tracking with heart rate monitor and GPS",
"price": "299.99",
"brand": "TechWear",
"gtin": "1234567890124",
"availability": "in stock",
"condition": "new",
"google_product_category": "Electronics > Wearables > Smart Watches",
},
{
"marketplace_product_id": "PROD-003",
"title": "Portable Bluetooth Speaker",
"description": "Waterproof speaker with 360° sound and 12-hour battery",
"price": "79.99",
"brand": "AudioTech",
"gtin": "1234567890125",
"availability": "in stock",
"condition": "new",
"google_product_category": "Electronics > Audio > Speakers",
},
{
"marketplace_product_id": "PROD-004",
"title": "Wireless Charging Pad",
"description": "Fast wireless charging for all Qi-enabled devices",
"price": "39.99",
"brand": "ChargeMax",
"gtin": "1234567890126",
"availability": "in stock",
"condition": "new",
"google_product_category": "Electronics > Accessories > Chargers",
},
{
"marketplace_product_id": "PROD-005",
"title": "4K Webcam",
"description": "Ultra HD webcam with auto-focus and built-in microphone",
"price": "129.99",
"brand": "VisionTech",
"gtin": "1234567890127",
"availability": "preorder",
"condition": "new",
"google_product_category": "Electronics > Computers > Webcams",
},
]
# =============================================================================
# UTILITY FUNCTIONS
# =============================================================================
def print_section(title: str, char: str = "="):
"""Print a formatted section header."""
width = 70
print(f"\n{char * width}")
print(f"{title.center(width)}")
print(f"{char * width}\n")
def print_step(step_num: int, description: str):
"""Print a step header."""
print(f"STEP {step_num}: {description}")
def print_success(message: str):
"""Print a success message."""
print(f"{message}")
def print_info(message: str):
"""Print an info message."""
print(f" {message}")
def print_error(message: str):
"""Print an error message."""
print(f"{message}")
def verify_database_ready() -> bool:
"""Verify that database tables exist."""
try:
with engine.connect() as conn:
from sqlalchemy import text
required_tables = ['users', 'vendors', 'products', 'marketplace_products']
for table in required_tables:
result = conn.execute(
text(f"SELECT name FROM sqlite_master WHERE type='table' AND name='{table}'")
)
if not result.fetchall():
print_error(f"Required table '{table}' not found")
return False
return True
except Exception as e:
print_error(f"Error checking database: {e}")
return False
def reset_database(db: Session):
"""Delete all data from database (destructive!)."""
print_section("RESETTING DATABASE", "!")
print("⚠️ WARNING: This will delete ALL data!")
# Order matters due to foreign key constraints
tables_to_clear = [
OrderItem,
Order,
CustomerAddress,
Customer,
Product,
MarketplaceProduct,
MarketplaceImportJob,
VendorUser,
Role,
VendorDomain,
VendorTheme,
Vendor,
AdminSetting,
PlatformAlert,
User,
]
try:
for model in tables_to_clear:
count = db.query(model).count()
if count > 0:
db.execute(delete(model))
print_info(f"Deleted {count} records from {model.__tablename__}")
db.commit()
print_success("Database reset complete")
except Exception as e:
db.rollback()
print_error(f"Failed to reset database: {e}")
raise
# =============================================================================
# SEEDING FUNCTIONS
# =============================================================================
def create_user(
db: Session,
auth_manager: AuthManager,
username: str,
email: str,
password: str,
role: str = "user",
first_name: str = None,
last_name: str = None,
) -> Tuple[User, bool]:
"""Create user if it doesn't exist."""
existing_user = db.execute(
select(User).where(User.username == username)
).scalar_one_or_none()
if existing_user:
return existing_user, False
user = User(
email=email,
username=username,
hashed_password=auth_manager.hash_password(password),
role=role,
first_name=first_name,
last_name=last_name,
is_active=True,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(user)
db.flush()
return user, True
def create_vendor(
db: Session,
vendor_code: str,
name: str,
subdomain: str,
owner_user_id: int,
description: str = None,
) -> Tuple[Vendor, bool]:
"""Create vendor if it doesn't exist."""
existing_vendor = db.execute(
select(Vendor).where(Vendor.vendor_code == vendor_code)
).scalar_one_or_none()
if existing_vendor:
return existing_vendor, False
vendor = Vendor(
vendor_code=vendor_code,
subdomain=subdomain,
name=name,
description=description or f"{name} - Online marketplace",
owner_user_id=owner_user_id,
contact_email=f"contact@{subdomain}.com",
contact_phone="+352 123 456 789",
website=f"https://{subdomain}.com",
is_active=True,
is_verified=True,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(vendor)
db.flush()
return vendor, True
def create_vendor_theme(
db: Session,
vendor_id: int,
theme_preset: str = "default",
) -> Tuple[VendorTheme, bool]:
"""Create vendor theme if it doesn't exist."""
existing_theme = db.execute(
select(VendorTheme).where(VendorTheme.vendor_id == vendor_id)
).scalar_one_or_none()
if existing_theme:
return existing_theme, False
preset = THEME_PRESETS.get(theme_preset, THEME_PRESETS["default"])
theme = VendorTheme(
vendor_id=vendor_id,
theme_name=preset["theme_name"],
is_active=True,
colors=preset["colors"],
font_family_heading=preset["font_family_heading"],
font_family_body=preset["font_family_body"],
layout_style=preset["layout_style"],
header_style=preset["header_style"],
product_card_style=preset["product_card_style"],
social_links={
"facebook": f"https://facebook.com/vendor{vendor_id}",
"instagram": f"https://instagram.com/vendor{vendor_id}",
"twitter": f"https://twitter.com/vendor{vendor_id}",
},
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(theme)
db.flush()
return theme, True
def create_vendor_domain(
db: Session,
vendor_id: int,
domain: str,
is_primary: bool = False,
) -> Tuple[VendorDomain, bool]:
"""Create vendor custom domain if it doesn't exist."""
normalized_domain = VendorDomain.normalize_domain(domain)
existing_domain = db.execute(
select(VendorDomain).where(VendorDomain.domain == normalized_domain)
).scalar_one_or_none()
if existing_domain:
return existing_domain, False
vendor_domain = VendorDomain(
vendor_id=vendor_id,
domain=normalized_domain,
is_primary=is_primary,
is_active=True,
ssl_status="pending",
is_verified=False,
verification_token=f"verify_{vendor_id}_{normalized_domain.replace('.', '_')}",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(vendor_domain)
db.flush()
return vendor_domain, True
def create_marketplace_product(
db: Session,
product_data: Dict,
marketplace: str = "Wizamart",
vendor_name: str = None,
) -> Tuple[MarketplaceProduct, bool]:
"""Create marketplace product if it doesn't exist."""
existing_product = db.execute(
select(MarketplaceProduct).where(
MarketplaceProduct.marketplace_product_id == product_data["marketplace_product_id"]
)
).scalar_one_or_none()
if existing_product:
return existing_product, False
mp_product = MarketplaceProduct(
marketplace_product_id=product_data["marketplace_product_id"],
title=product_data["title"],
description=product_data.get("description"),
price=product_data.get("price"),
brand=product_data.get("brand"),
gtin=product_data.get("gtin"),
availability=product_data.get("availability", "in stock"),
condition=product_data.get("condition", "new"),
google_product_category=product_data.get("google_product_category"),
marketplace=marketplace,
vendor_name=vendor_name,
currency="EUR",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(mp_product)
db.flush()
return mp_product, True
def create_vendor_product(
db: Session,
vendor_id: int,
marketplace_product_id: int,
product_id: str = None,
price: float = None,
is_featured: bool = False,
) -> Tuple[Product, bool]:
"""Create vendor product (links vendor to marketplace product)."""
existing_product = db.execute(
select(Product).where(
Product.vendor_id == vendor_id,
Product.marketplace_product_id == marketplace_product_id
)
).scalar_one_or_none()
if existing_product:
return existing_product, False
product = Product(
vendor_id=vendor_id,
marketplace_product_id=marketplace_product_id,
product_id=product_id,
price=price,
currency="EUR",
availability="in stock",
condition="new",
is_featured=is_featured,
is_active=True,
display_order=0,
min_quantity=1,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(product)
db.flush()
return product, True
def create_customer(
db: Session,
auth_manager: AuthManager,
vendor_id: int,
email: str,
password: str,
first_name: str,
last_name: str,
customer_number: str,
) -> Tuple[Customer, bool]:
"""Create customer for a vendor."""
existing_customer = db.execute(
select(Customer).where(
Customer.vendor_id == vendor_id,
Customer.email == email
)
).scalar_one_or_none()
if existing_customer:
return existing_customer, False
customer = Customer(
vendor_id=vendor_id,
email=email,
hashed_password=auth_manager.hash_password(password),
first_name=first_name,
last_name=last_name,
phone="+352 99 123 456",
customer_number=customer_number,
is_active=True,
total_orders=0,
total_spent=Decimal("0.00"),
marketing_consent=True,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(customer)
db.flush()
return customer, True
def create_customer_address(
db: Session,
vendor_id: int,
customer_id: int,
address_type: str,
is_default: bool = False,
) -> Tuple[CustomerAddress, bool]:
"""Create customer address."""
existing_address = db.execute(
select(CustomerAddress).where(
CustomerAddress.customer_id == customer_id,
CustomerAddress.address_type == address_type
)
).scalar_one_or_none()
if existing_address:
return existing_address, False
address = CustomerAddress(
vendor_id=vendor_id,
customer_id=customer_id,
address_type=address_type,
first_name="John",
last_name="Doe",
company="ACME Corp" if address_type == "billing" else None,
address_line_1="123 Main Street",
address_line_2="Apt 4B",
city="Luxembourg",
postal_code="L-1234",
country="Luxembourg",
is_default=is_default,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(address)
db.flush()
return address, True
def create_order(
db: Session,
vendor_id: int,
customer_id: int,
order_number: str,
products: List[Tuple[int, int, float]], # (product_id, quantity, price)
shipping_address_id: int,
billing_address_id: int,
status: str = "pending",
) -> Tuple[Order, bool]:
"""Create order with items."""
existing_order = db.execute(
select(Order).where(Order.order_number == order_number)
).scalar_one_or_none()
if existing_order:
return existing_order, False
# Calculate totals
subtotal = sum(qty * price for _, qty, price in products)
tax_amount = subtotal * 0.17 # 17% VAT
shipping_amount = 9.99 if subtotal < 50 else 0.0
total_amount = subtotal + tax_amount + shipping_amount
order = Order(
vendor_id=vendor_id,
customer_id=customer_id,
order_number=order_number,
status=status,
subtotal=subtotal,
tax_amount=tax_amount,
shipping_amount=shipping_amount,
discount_amount=0.0,
total_amount=total_amount,
currency="EUR",
shipping_address_id=shipping_address_id,
billing_address_id=billing_address_id,
shipping_method="Standard Shipping",
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(order)
db.flush()
# Create order items
for product_id, quantity, unit_price in products:
order_item = OrderItem(
order_id=order.id,
product_id=product_id,
product_name=f"Product {product_id}",
product_sku=f"SKU-{product_id}",
quantity=quantity,
unit_price=unit_price,
total_price=quantity * unit_price,
inventory_reserved=True,
inventory_fulfilled=False,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(order_item)
db.flush()
return order, True
def create_import_job(
db: Session,
vendor_id: int,
user_id: int,
marketplace: str,
status: str = "completed",
imported_count: int = 5,
) -> Tuple[MarketplaceImportJob, bool]:
"""Create import job."""
job = MarketplaceImportJob(
vendor_id=vendor_id,
user_id=user_id,
marketplace=marketplace,
source_url=f"https://{marketplace.lower()}.com/feed.xml",
status=status,
imported_count=imported_count,
updated_count=0,
error_count=0,
total_processed=imported_count,
started_at=datetime.now(timezone.utc) - timedelta(hours=1),
completed_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(job)
db.flush()
return job, True
def create_admin_settings(db: Session) -> List[AdminSetting]:
"""Create platform admin settings."""
settings = [
{
"key": "max_vendors_allowed",
"value": "1000",
"value_type": "integer",
"category": "system",
"description": "Maximum number of vendors allowed on the platform",
},
{
"key": "maintenance_mode",
"value": "false",
"value_type": "boolean",
"category": "system",
"description": "Enable maintenance mode to prevent access",
},
{
"key": "default_vendor_trial_days",
"value": "30",
"value_type": "integer",
"category": "marketplace",
"description": "Default trial period for new vendors in days",
},
{
"key": "platform_commission_rate",
"value": "0.05",
"value_type": "float",
"category": "marketplace",
"description": "Platform commission rate (5%)",
},
]
created_settings = []
for setting_data in settings:
existing_setting = db.execute(
select(AdminSetting).where(AdminSetting.key == setting_data["key"])
).scalar_one_or_none()
if not existing_setting:
setting = AdminSetting(
key=setting_data["key"],
value=setting_data["value"],
value_type=setting_data["value_type"],
category=setting_data["category"],
description=setting_data["description"],
is_encrypted=False,
is_public=False,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(setting)
created_settings.append(setting)
db.flush()
return created_settings
def create_platform_alerts(db: Session) -> List[PlatformAlert]:
"""Create sample platform alerts."""
alerts = [
{
"alert_type": "performance",
"severity": "info",
"title": "High Traffic Detected",
"description": "Platform experiencing higher than normal traffic",
"is_resolved": True,
"resolved_at": datetime.now(timezone.utc) - timedelta(hours=2),
},
{
"alert_type": "security",
"severity": "warning",
"title": "Multiple Failed Login Attempts",
"description": "Detected multiple failed login attempts from IP 192.168.1.100",
"is_resolved": False,
},
]
created_alerts = []
for alert_data in alerts:
alert = PlatformAlert(
alert_type=alert_data["alert_type"],
severity=alert_data["severity"],
title=alert_data["title"],
description=alert_data["description"],
is_resolved=alert_data["is_resolved"],
resolved_at=alert_data.get("resolved_at"),
auto_generated=True,
occurrence_count=1,
first_occurred_at=datetime.now(timezone.utc) - timedelta(hours=3),
last_occurred_at=datetime.now(timezone.utc),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.add(alert)
created_alerts.append(alert)
db.flush()
return created_alerts
# =============================================================================
# MAIN SEEDING LOGIC
# =============================================================================
def seed_minimal(db: Session, auth_manager: AuthManager):
"""Seed minimal data (admin + 1 vendor)."""
print_section("MINIMAL SEEDING")
# Create admin user
print_step(1, "Creating admin user...")
admin, admin_created = create_user(
db, auth_manager,
username=DEFAULT_ADMIN_USERNAME,
email=DEFAULT_ADMIN_EMAIL,
password=DEFAULT_ADMIN_PASSWORD,
role="admin"
)
if admin_created:
print_success(f"Admin user created (ID: {admin.id})")
else:
print_info(f"Admin user already exists (ID: {admin.id})")
# Create WizaMart vendor
print_step(2, "Creating WizaMart vendor...")
vendor, vendor_created = create_vendor(
db,
vendor_code="WIZAMART",
name="WizaMart",
subdomain="wizamart",
owner_user_id=admin.id,
description="Premium electronics marketplace"
)
if vendor_created:
print_success(f"WizaMart created (ID: {vendor.id})")
else:
print_info(f"WizaMart already exists (ID: {vendor.id})")
db.commit()
print_success("Minimal seeding complete")
def seed_full(db: Session, auth_manager: AuthManager):
"""Seed comprehensive test data."""
print_section("COMPREHENSIVE SEEDING")
# =========================================================================
# STEP 1: CREATE USERS
# =========================================================================
print_step(1, "Creating users...")
# Create admin
admin, admin_created = create_user(
db, auth_manager,
username=DEFAULT_ADMIN_USERNAME,
email=DEFAULT_ADMIN_EMAIL,
password=DEFAULT_ADMIN_PASSWORD,
role="admin"
)
if admin_created:
print_success(f"Admin user created (ID: {admin.id})")
else:
print_info(f"Admin user already exists (ID: {admin.id})")
# Create test users
test_users_created = []
for user_data in TEST_USERS:
user, created = create_user(
db, auth_manager,
username=user_data["username"],
email=user_data["email"],
password=user_data["password"],
role=user_data["role"],
first_name=user_data.get("first_name"),
last_name=user_data.get("last_name"),
)
if created:
test_users_created.append(user)
print_success(f"User '{user.username}' created (ID: {user.id})")
if not test_users_created:
print_info("All test users already exist")
# =========================================================================
# STEP 2: CREATE VENDORS
# =========================================================================
print_step(2, "Creating vendors...")
vendors = []
for config in VENDOR_CONFIGS:
vendor, created = create_vendor(
db,
vendor_code=config["vendor_code"],
name=config["name"],
subdomain=config["subdomain"],
owner_user_id=admin.id,
description=config["description"]
)
vendors.append(vendor)
if created:
print_success(f"{config['vendor_code']} created (ID: {vendor.id})")
# Create vendor theme
theme, theme_created = create_vendor_theme(
db,
vendor_id=vendor.id,
theme_preset=config.get("theme_preset", "default")
)
if theme_created:
print_success(f" Theme '{theme.theme_name}' applied")
# Create custom domain if specified
if config.get("custom_domain"):
domain, domain_created = create_vendor_domain(
db,
vendor_id=vendor.id,
domain=config["custom_domain"],
is_primary=True
)
if domain_created:
print_success(f" Custom domain '{domain.domain}' added")
else:
print_info(f"{config['vendor_code']} already exists (ID: {vendor.id})")
# =========================================================================
# STEP 3: CREATE PRODUCTS
# =========================================================================
print_step(3, "Creating products...")
products_created = 0
vendor_products = []
for product_data in SAMPLE_PRODUCTS:
# Create marketplace product
mp_product, created = create_marketplace_product(
db,
product_data=product_data,
marketplace="Wizamart",
vendor_name=vendors[0].name
)
if created:
products_created += 1
# Link to first two vendors
for vendor in vendors[:2]:
vendor_product, vp_created = create_vendor_product(
db,
vendor_id=vendor.id,
marketplace_product_id=mp_product.id,
product_id=f"{vendor.vendor_code}-{product_data['marketplace_product_id']}",
price=float(product_data["price"]),
is_featured=(products_created <= 2)
)
if vp_created:
vendor_products.append(vendor_product)
print_success(f"Created {products_created} marketplace products")
print_success(f"Created {len(vendor_products)} vendor product links")
# =========================================================================
# STEP 4: CREATE CUSTOMERS
# =========================================================================
print_step(4, "Creating customers...")
customers_created = 0
addresses_created = 0
for i, vendor in enumerate(vendors[:2]):
# Create 2 customers per vendor
for j in range(1, 3):
customer, created = create_customer(
db,
auth_manager,
vendor_id=vendor.id,
email=f"customer{i + 1}{j}@example.com",
password="password123",
first_name=f"Customer{i + 1}{j}",
last_name="Buyer",
customer_number=f"{vendor.vendor_code}-CUST-{j:04d}"
)
if created:
customers_created += 1
# Create addresses
shipping_addr, _ = create_customer_address(
db, vendor.id, customer.id, "shipping", is_default=True
)
billing_addr, _ = create_customer_address(
db, vendor.id, customer.id, "billing", is_default=True
)
addresses_created += 2
print_success(f"Created {customers_created} customers")
print_success(f"Created {addresses_created} addresses")
# =========================================================================
# STEP 5: CREATE ORDERS
# =========================================================================
print_step(5, "Creating orders...")
orders_created = 0
# Get customers for first vendor
vendor = vendors[0]
vendor_customers = db.execute(
select(Customer).where(Customer.vendor_id == vendor.id)
).scalars().all()
vendor_product_list = [vp for vp in vendor_products if vp.vendor_id == vendor.id]
if vendor_customers and vendor_product_list:
for i, customer in enumerate(vendor_customers[:2]):
# Get customer addresses
addresses = db.execute(
select(CustomerAddress).where(CustomerAddress.customer_id == customer.id)
).scalars().all()
if len(addresses) >= 2:
shipping_addr = next(a for a in addresses if a.address_type == "shipping")
billing_addr = next(a for a in addresses if a.address_type == "billing")
# Create order with 2 products
products_for_order = [
(vendor_product_list[0].id, 1, 149.99),
(vendor_product_list[1].id, 2, 79.99),
]
order, created = create_order(
db,
vendor_id=vendor.id,
customer_id=customer.id,
order_number=f"ORD-{vendor.vendor_code}-{i + 1:05d}",
products=products_for_order,
shipping_address_id=shipping_addr.id,
billing_address_id=billing_addr.id,
status="completed" if i == 0 else "processing"
)
if created:
orders_created += 1
print_success(f"Created {orders_created} orders")
# =========================================================================
# STEP 6: CREATE IMPORT JOBS
# =========================================================================
print_step(6, "Creating import jobs...")
jobs_created = 0
for vendor in vendors[:2]:
job, created = create_import_job(
db,
vendor_id=vendor.id,
user_id=admin.id,
marketplace="Wizamart",
status="completed",
imported_count=len(SAMPLE_PRODUCTS)
)
if created:
jobs_created += 1
print_success(f"Created {jobs_created} import jobs")
# =========================================================================
# STEP 7: CREATE ADMIN DATA
# =========================================================================
print_step(7, "Creating admin settings and alerts...")
settings = create_admin_settings(db)
alerts = create_platform_alerts(db)
print_success(f"Created {len(settings)} admin settings")
print_success(f"Created {len(alerts)} platform alerts")
# =========================================================================
# COMMIT ALL CHANGES
# =========================================================================
db.commit()
print_success("All data committed successfully")
def print_summary(db: Session):
"""Print summary of seeded data."""
print_section("SEEDING SUMMARY")
# Count records
user_count = db.query(User).count()
vendor_count = db.query(Vendor).count()
theme_count = db.query(VendorTheme).count()
domain_count = db.query(VendorDomain).count()
mp_product_count = db.query(MarketplaceProduct).count()
product_count = db.query(Product).count()
customer_count = db.query(Customer).count()
address_count = db.query(CustomerAddress).count()
order_count = db.query(Order).count()
order_item_count = db.query(OrderItem).count()
import_job_count = db.query(MarketplaceImportJob).count()
setting_count = db.query(AdminSetting).count()
alert_count = db.query(PlatformAlert).count()
print(f"📊 Database Statistics:")
print(f" Users: {user_count}")
print(f" Vendors: {vendor_count}")
print(f" Vendor Themes: {theme_count}")
print(f" Custom Domains: {domain_count}")
print(f" Marketplace Products: {mp_product_count}")
print(f" Vendor Products: {product_count}")
print(f" Customers: {customer_count}")
print(f" Addresses: {address_count}")
print(f" Orders: {order_count}")
print(f" Order Items: {order_item_count}")
print(f" Import Jobs: {import_job_count}")
print(f" Admin Settings: {setting_count}")
print(f" Platform Alerts: {alert_count}")
print("\n" + "" * 70)
print("🔐 ADMIN LOGIN CREDENTIALS")
print("" * 70)
print(f" URL: http://localhost:8000/admin/login")
print(f" Username: {DEFAULT_ADMIN_USERNAME}")
print(f" Password: {DEFAULT_ADMIN_PASSWORD}")
print("" * 70)
print("\n" + "" * 70)
print("🏪 VENDORS")
print("" * 70)
vendors = db.execute(select(Vendor)).scalars().all()
for vendor in vendors:
print(f"\n {vendor.vendor_code}")
print(f" Name: {vendor.name}")
print(f" Subdomain: {vendor.subdomain}")
if vendor.vendor_theme:
print(f" Theme: {vendor.vendor_theme.theme_name}")
custom_domains = [d for d in vendor.domains if d.is_active]
if custom_domains:
print(f" Domains: {', '.join(d.domain for d in custom_domains)}")
product_count = db.query(Product).filter(Product.vendor_id == vendor.id).count()
print(f" Products: {product_count}")
customer_count = db.query(Customer).filter(Customer.vendor_id == vendor.id).count()
print(f" Customers: {customer_count}")
print("\n" + "" * 70)
print("🚀 NEXT STEPS")
print("" * 70)
print(" 1. Start the server:")
print(" make dev")
print()
print(" 2. Login to admin panel:")
print(" http://localhost:8000/admin/login")
print()
print(" 3. Test vendor shops:")
for vendor in vendors[:3]:
print(f" http://localhost:8000/shop/{vendor.vendor_code}")
print()
print("⚠️ SECURITY: Change default passwords in production!")
print()
# =============================================================================
# MAIN ENTRY POINT
# =============================================================================
def main():
"""Main entry point for database seeding."""
parser = argparse.ArgumentParser(
description="Seed Wizamart database with test data"
)
parser.add_argument(
"--reset",
action="store_true",
help="Reset database before seeding (destructive!)"
)
parser.add_argument(
"--minimal",
action="store_true",
help="Create only minimal data (admin + 1 vendor)"
)
args = parser.parse_args()
print("\n" + "" + "" * 68 + "")
print("" + " " * 18 + "WIZAMART DATABASE SEEDER" + " " * 26 + "")
print("" + "" * 68 + "\n")
# =========================================================================
# VERIFY DATABASE
# =========================================================================
print_step(1, "Verifying database...")
if not verify_database_ready():
print_error("Database not ready!")
print("\nRequired tables don't exist. Run migrations first:")
print(" make migrate-up\n")
sys.exit(1)
print_success("Database tables verified")
# =========================================================================
# SEED DATA
# =========================================================================
db = SessionLocal()
auth_manager = AuthManager()
try:
# Reset database if requested
if args.reset:
confirm = input("\n⚠️ Are you sure you want to DELETE ALL DATA? (type 'yes' to confirm): ")
if confirm.lower() == 'yes':
reset_database(db)
else:
print("Reset cancelled.")
return
# Seed data
if args.minimal:
seed_minimal(db, auth_manager)
else:
seed_full(db, auth_manager)
# Print summary
print_summary(db)
print_section("✅ SEEDING COMPLETED SUCCESSFULLY!", "=")
except KeyboardInterrupt:
db.rollback()
print("\n\n⚠️ Seeding interrupted by user")
sys.exit(1)
except Exception as e:
db.rollback()
print_section("❌ SEEDING FAILED", "!")
print(f"Error: {e}\n")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
db.close()
if __name__ == "__main__":
main()