#!/usr/bin/env python3 """ Production Database Initialization for Wizamart Platform This script initializes ESSENTIAL data required for production: - Platform admin user - Default vendor roles and permissions - Admin settings - Platform configuration This is SAFE to run in production and should be run after migrations. Usage: make init-prod This script is idempotent - safe to run multiple times. """ import sys from datetime import UTC, datetime from pathlib import Path # Add project root to path project_root = Path(__file__).parent.parent sys.path.insert(0, str(project_root)) from sqlalchemy import select from sqlalchemy.orm import Session from app.core.config import ( print_environment_info, settings, validate_production_settings, ) from app.core.database import SessionLocal from app.core.environment import is_production from app.core.permissions import PermissionGroups from middleware.auth import AuthManager from app.modules.tenancy.models import AdminSetting from app.modules.tenancy.models import User # ============================================================================= # HELPER FUNCTIONS # ============================================================================= def print_header(text: str): """Print formatted header.""" print("\n" + "=" * 70) print(f" {text}") print("=" * 70) def print_step(step: int, text: str): """Print step indicator.""" print(f"\n[{step}] {text}") def print_success(text: str): """Print success message.""" print(f" āœ“ {text}") def print_warning(text: str): """Print warning message.""" print(f" ⚠ {text}") def print_error(text: str): """Print error message.""" print(f" āœ— {text}") # ============================================================================= # INITIALIZATION FUNCTIONS # ============================================================================= def create_admin_user(db: Session, auth_manager: AuthManager) -> User: """Create or get the platform admin user.""" # Check if admin already exists admin = db.execute( select(User).where(User.email == settings.admin_email) ).scalar_one_or_none() if admin: print_warning(f"Admin user already exists: {admin.email}") return admin # Create new admin hashed_password = auth_manager.hash_password(settings.admin_password) admin = User( username=settings.admin_username, email=settings.admin_email, hashed_password=hashed_password, role="admin", first_name=settings.admin_first_name, last_name=settings.admin_last_name, is_active=True, is_email_verified=True, created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) db.add(admin) db.flush() print_success(f"Created admin user: {admin.email}") return admin def create_default_role_templates(db: Session) -> dict: """Create default role templates (not tied to any vendor). These serve as templates that can be copied when creating vendor-specific roles. Note: Actual roles are vendor-specific and created when vendors are onboarded. """ print(" Available role presets:") print(" - Manager: Nearly full access (except team management)") print(" - Staff: Day-to-day operations") print(" - Support: Customer service focused") print(" - Viewer: Read-only access") print(" - Marketing: Marketing and customer communication") print_success("Role templates ready for vendor onboarding") return { "manager": list(PermissionGroups.MANAGER), "staff": list(PermissionGroups.STAFF), "support": list(PermissionGroups.SUPPORT), "viewer": list(PermissionGroups.VIEWER), "marketing": list(PermissionGroups.MARKETING), } def create_admin_settings(db: Session) -> int: """Create essential admin settings.""" settings_created = 0 # Essential platform settings default_settings = [ { "key": "platform_name", "value": settings.project_name, "value_type": "string", "description": "Platform name displayed in admin panel", "is_public": True, }, { "key": "platform_url", "value": f"https://{settings.platform_domain}", "value_type": "string", "description": "Main platform URL", "is_public": True, }, { "key": "support_email", "value": f"support@{settings.platform_domain}", "value_type": "string", "description": "Platform support email", "is_public": True, }, { "key": "max_vendors_per_user", "value": str(settings.max_vendors_per_user), "value_type": "integer", "description": "Maximum vendors a user can own", "is_public": False, }, { "key": "max_team_members_per_vendor", "value": str(settings.max_team_members_per_vendor), "value_type": "integer", "description": "Maximum team members per vendor", "is_public": False, }, { "key": "invitation_expiry_days", "value": str(settings.invitation_expiry_days), "value_type": "integer", "description": "Days until team invitation expires", "is_public": False, }, { "key": "require_vendor_verification", "value": "true", "value_type": "boolean", "description": "Require admin verification for new vendors", "is_public": False, }, { "key": "allow_custom_domains", "value": str(settings.allow_custom_domains).lower(), "value_type": "boolean", "description": "Allow vendors to use custom domains", "is_public": False, }, { "key": "maintenance_mode", "value": "false", "value_type": "boolean", "description": "Enable maintenance mode", "is_public": True, }, # Logging settings { "key": "log_level", "value": "INFO", "value_type": "string", "category": "logging", "description": "Application log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)", "is_public": False, }, { "key": "log_file_max_size_mb", "value": "10", "value_type": "integer", "category": "logging", "description": "Maximum log file size in MB before rotation", "is_public": False, }, { "key": "log_file_backup_count", "value": "5", "value_type": "integer", "category": "logging", "description": "Number of rotated log files to keep", "is_public": False, }, { "key": "db_log_retention_days", "value": "30", "value_type": "integer", "category": "logging", "description": "Number of days to retain logs in database", "is_public": False, }, { "key": "file_logging_enabled", "value": "true", "value_type": "boolean", "category": "logging", "description": "Enable file-based logging", "is_public": False, }, { "key": "db_logging_enabled", "value": "true", "value_type": "boolean", "category": "logging", "description": "Enable database logging for critical events", "is_public": False, }, ] for setting_data in default_settings: # Check if setting already exists existing = db.execute( select(AdminSetting).where(AdminSetting.key == setting_data["key"]) ).scalar_one_or_none() if not existing: setting = AdminSetting( key=setting_data["key"], value=setting_data["value"], value_type=setting_data["value_type"], category=setting_data.get("category"), description=setting_data.get("description"), is_public=setting_data.get("is_public", False), created_at=datetime.now(UTC), updated_at=datetime.now(UTC), ) db.add(setting) settings_created += 1 db.flush() if settings_created > 0: print_success(f"Created {settings_created} admin settings") else: print_warning("Admin settings already exist") return settings_created def verify_rbac_schema(db: Session) -> bool: """Verify that RBAC schema is in place.""" try: from sqlalchemy import inspect inspector = inspect(db.bind) tables = inspector.get_table_names() # Check users table has is_email_verified if "users" in tables: user_cols = {col["name"] for col in inspector.get_columns("users")} if "is_email_verified" not in user_cols: print_error("Missing 'is_email_verified' column in users table") return False # Check vendor_users has RBAC columns if "vendor_users" in tables: vu_cols = {col["name"] for col in inspector.get_columns("vendor_users")} required_cols = { "user_type", "invitation_token", "invitation_sent_at", "invitation_accepted_at", } missing = required_cols - vu_cols if missing: print_error(f"Missing columns in vendor_users: {missing}") return False # Check roles table exists if "roles" not in tables: print_error("Missing 'roles' table") return False print_success("RBAC schema verified") return True except Exception as e: print_error(f"Schema verification failed: {e}") return False # ============================================================================= # MAIN INITIALIZATION # ============================================================================= def initialize_production(db: Session, auth_manager: AuthManager): """Initialize production database with essential data.""" print_header("PRODUCTION INITIALIZATION") # Step 1: Verify RBAC schema print_step(1, "Verifying RBAC schema...") if not verify_rbac_schema(db): print_error("RBAC schema not ready. Run migrations first:") print(" make migrate-up") sys.exit(1) # Step 2: Create admin user print_step(2, "Creating platform admin...") admin = create_admin_user(db, auth_manager) # Step 3: Set up default role templates print_step(3, "Setting up role templates...") role_templates = create_default_role_templates(db) # Step 4: Create admin settings print_step(4, "Creating admin settings...") create_admin_settings(db) # Commit all changes db.commit() print_success("All changes committed") def print_summary(db: Session): """Print initialization summary.""" print_header("INITIALIZATION SUMMARY") # Count records user_count = db.query(User).filter(User.role == "admin").count() setting_count = db.query(AdminSetting).count() print("\nšŸ“Š Database Status:") print(f" Admin users: {user_count}") print(f" Admin settings: {setting_count}") print("\n" + "─" * 70) print("šŸ” ADMIN CREDENTIALS") print("─" * 70) print(" URL: /admin/login") print(f" Username: {settings.admin_username}") print(f" Password: {settings.admin_password}") print("─" * 70) # Show security warnings if in production if is_production(): warnings = validate_production_settings() if warnings: print("\nāš ļø SECURITY WARNINGS:") for warning in warnings: print(f" {warning}") print("\n Please update your .env file with secure values!") else: print("\nāš ļø DEVELOPMENT MODE:") print(" Default credentials are OK for development") print(" Change them in production via .env file") print("\nšŸš€ NEXT STEPS:") print(" 1. Login to admin panel") if is_production(): print(" 2. CHANGE DEFAULT PASSWORD IMMEDIATELY!") print(" 3. Configure admin settings") print(" 4. Create first vendor") else: print(" 2. Create demo data: make seed-demo") print(" 3. Start development: make dev") print("\nšŸ“ FOR DEMO DATA (Development only):") print(" make seed-demo") # ============================================================================= # MAIN ENTRY POINT # ============================================================================= def main(): """Main entry point.""" print("\n" + "ā•”" + "═" * 68 + "ā•—") print("ā•‘" + " " * 16 + "PRODUCTION INITIALIZATION" + " " * 27 + "ā•‘") print("ā•š" + "═" * 68 + "ā•") # Show environment info print_environment_info() # Production safety check if is_production(): warnings = validate_production_settings() if warnings: print("\nāš ļø PRODUCTION WARNINGS DETECTED:") for warning in warnings: print(f" {warning}") print("\nUpdate your .env file before continuing!") response = input("\nContinue anyway? (yes/no): ") if response.lower() != "yes": print("Initialization cancelled.") sys.exit(0) db = SessionLocal() auth_manager = AuthManager() try: initialize_production(db, auth_manager) print_summary(db) print_header("āœ… INITIALIZATION COMPLETED") except KeyboardInterrupt: db.rollback() print("\n\nāš ļø Initialization interrupted") sys.exit(1) except Exception as e: db.rollback() print_header("āŒ INITIALIZATION FAILED") print(f"\nError: {e}\n") import traceback traceback.print_exc() sys.exit(1) finally: db.close() if __name__ == "__main__": main()