Files
orion/scripts/init_production.py
Samir Boulahtit 30a5c75e74 refactor: remove backward compatibility layer for permissions
- Delete app/core/permissions.py (VendorPermissions enum, PermissionGroups)
- Update all code to use permission_discovery_service directly:
  - app/api/deps.py: get_user_permissions() uses discovery service
  - app/modules/tenancy/models/vendor.py: get_all_permissions() uses discovery
  - app/modules/tenancy/routes/api/vendor_team.py: use string literals
  - app/modules/tenancy/services/vendor_team_service.py: use discovery service
  - scripts/init_production.py: use discovery service for presets

Permissions are now fully module-driven:
- Each module defines permissions in definition.py
- PermissionDiscoveryService aggregates all permissions
- Role presets reference permission IDs directly

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-01 21:49:11 +01:00

475 lines
14 KiB
Python

#!/usr/bin/env python3
"""
Production Database Initialization for Wizamart Platform
This script initializes ESSENTIAL data required for production:
- Platform admin user
- Default vendor roles and permissions
- Admin settings
- Platform configuration
This is SAFE to run in production and should be run after migrations.
Usage:
make init-prod
This script is idempotent - safe to run multiple times.
"""
import sys
from datetime import UTC, datetime
from pathlib import Path
# Add project root to path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
from sqlalchemy import select
from sqlalchemy.orm import Session
from app.core.config import (
print_environment_info,
settings,
validate_production_settings,
)
from app.core.database import SessionLocal
from app.core.environment import is_production
from app.modules.tenancy.services.permission_discovery_service import (
permission_discovery_service,
)
from middleware.auth import AuthManager
from app.modules.tenancy.models import AdminSetting
from app.modules.tenancy.models import User
# =============================================================================
# HELPER FUNCTIONS
# =============================================================================
def print_header(text: str):
"""Print formatted header."""
print("\n" + "=" * 70)
print(f" {text}")
print("=" * 70)
def print_step(step: int, text: str):
"""Print step indicator."""
print(f"\n[{step}] {text}")
def print_success(text: str):
"""Print success message."""
print(f"{text}")
def print_warning(text: str):
"""Print warning message."""
print(f"{text}")
def print_error(text: str):
"""Print error message."""
print(f"{text}")
# =============================================================================
# INITIALIZATION FUNCTIONS
# =============================================================================
def create_admin_user(db: Session, auth_manager: AuthManager) -> User:
"""Create or get the platform admin user."""
# Check if admin already exists
admin = db.execute(
select(User).where(User.email == settings.admin_email)
).scalar_one_or_none()
if admin:
print_warning(f"Admin user already exists: {admin.email}")
return admin
# Create new admin
hashed_password = auth_manager.hash_password(settings.admin_password)
admin = User(
username=settings.admin_username,
email=settings.admin_email,
hashed_password=hashed_password,
role="admin",
first_name=settings.admin_first_name,
last_name=settings.admin_last_name,
is_active=True,
is_email_verified=True,
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
db.add(admin)
db.flush()
print_success(f"Created admin user: {admin.email}")
return admin
def create_default_role_templates(db: Session) -> dict:
"""Create default role templates (not tied to any vendor).
These serve as templates that can be copied when creating vendor-specific roles.
Note: Actual roles are vendor-specific and created when vendors are onboarded.
"""
print(" Available role presets:")
print(" - Manager: Nearly full access (except team management)")
print(" - Staff: Day-to-day operations")
print(" - Support: Customer service focused")
print(" - Viewer: Read-only access")
print(" - Marketing: Marketing and customer communication")
print_success("Role templates ready for vendor onboarding")
return {
"manager": list(permission_discovery_service.get_preset_permissions("manager")),
"staff": list(permission_discovery_service.get_preset_permissions("staff")),
"support": list(permission_discovery_service.get_preset_permissions("support")),
"viewer": list(permission_discovery_service.get_preset_permissions("viewer")),
"marketing": list(permission_discovery_service.get_preset_permissions("marketing")),
}
def create_admin_settings(db: Session) -> int:
"""Create essential admin settings."""
settings_created = 0
# Essential platform settings
default_settings = [
{
"key": "platform_name",
"value": settings.project_name,
"value_type": "string",
"description": "Platform name displayed in admin panel",
"is_public": True,
},
{
"key": "platform_url",
"value": f"https://{settings.platform_domain}",
"value_type": "string",
"description": "Main platform URL",
"is_public": True,
},
{
"key": "support_email",
"value": f"support@{settings.platform_domain}",
"value_type": "string",
"description": "Platform support email",
"is_public": True,
},
{
"key": "max_vendors_per_user",
"value": str(settings.max_vendors_per_user),
"value_type": "integer",
"description": "Maximum vendors a user can own",
"is_public": False,
},
{
"key": "max_team_members_per_vendor",
"value": str(settings.max_team_members_per_vendor),
"value_type": "integer",
"description": "Maximum team members per vendor",
"is_public": False,
},
{
"key": "invitation_expiry_days",
"value": str(settings.invitation_expiry_days),
"value_type": "integer",
"description": "Days until team invitation expires",
"is_public": False,
},
{
"key": "require_vendor_verification",
"value": "true",
"value_type": "boolean",
"description": "Require admin verification for new vendors",
"is_public": False,
},
{
"key": "allow_custom_domains",
"value": str(settings.allow_custom_domains).lower(),
"value_type": "boolean",
"description": "Allow vendors to use custom domains",
"is_public": False,
},
{
"key": "maintenance_mode",
"value": "false",
"value_type": "boolean",
"description": "Enable maintenance mode",
"is_public": True,
},
# Logging settings
{
"key": "log_level",
"value": "INFO",
"value_type": "string",
"category": "logging",
"description": "Application log level (DEBUG, INFO, WARNING, ERROR, CRITICAL)",
"is_public": False,
},
{
"key": "log_file_max_size_mb",
"value": "10",
"value_type": "integer",
"category": "logging",
"description": "Maximum log file size in MB before rotation",
"is_public": False,
},
{
"key": "log_file_backup_count",
"value": "5",
"value_type": "integer",
"category": "logging",
"description": "Number of rotated log files to keep",
"is_public": False,
},
{
"key": "db_log_retention_days",
"value": "30",
"value_type": "integer",
"category": "logging",
"description": "Number of days to retain logs in database",
"is_public": False,
},
{
"key": "file_logging_enabled",
"value": "true",
"value_type": "boolean",
"category": "logging",
"description": "Enable file-based logging",
"is_public": False,
},
{
"key": "db_logging_enabled",
"value": "true",
"value_type": "boolean",
"category": "logging",
"description": "Enable database logging for critical events",
"is_public": False,
},
]
for setting_data in default_settings:
# Check if setting already exists
existing = db.execute(
select(AdminSetting).where(AdminSetting.key == setting_data["key"])
).scalar_one_or_none()
if not existing:
setting = AdminSetting(
key=setting_data["key"],
value=setting_data["value"],
value_type=setting_data["value_type"],
category=setting_data.get("category"),
description=setting_data.get("description"),
is_public=setting_data.get("is_public", False),
created_at=datetime.now(UTC),
updated_at=datetime.now(UTC),
)
db.add(setting)
settings_created += 1
db.flush()
if settings_created > 0:
print_success(f"Created {settings_created} admin settings")
else:
print_warning("Admin settings already exist")
return settings_created
def verify_rbac_schema(db: Session) -> bool:
"""Verify that RBAC schema is in place."""
try:
from sqlalchemy import inspect
inspector = inspect(db.bind)
tables = inspector.get_table_names()
# Check users table has is_email_verified
if "users" in tables:
user_cols = {col["name"] for col in inspector.get_columns("users")}
if "is_email_verified" not in user_cols:
print_error("Missing 'is_email_verified' column in users table")
return False
# Check vendor_users has RBAC columns
if "vendor_users" in tables:
vu_cols = {col["name"] for col in inspector.get_columns("vendor_users")}
required_cols = {
"user_type",
"invitation_token",
"invitation_sent_at",
"invitation_accepted_at",
}
missing = required_cols - vu_cols
if missing:
print_error(f"Missing columns in vendor_users: {missing}")
return False
# Check roles table exists
if "roles" not in tables:
print_error("Missing 'roles' table")
return False
print_success("RBAC schema verified")
return True
except Exception as e:
print_error(f"Schema verification failed: {e}")
return False
# =============================================================================
# MAIN INITIALIZATION
# =============================================================================
def initialize_production(db: Session, auth_manager: AuthManager):
"""Initialize production database with essential data."""
print_header("PRODUCTION INITIALIZATION")
# Step 1: Verify RBAC schema
print_step(1, "Verifying RBAC schema...")
if not verify_rbac_schema(db):
print_error("RBAC schema not ready. Run migrations first:")
print(" make migrate-up")
sys.exit(1)
# Step 2: Create admin user
print_step(2, "Creating platform admin...")
admin = create_admin_user(db, auth_manager)
# Step 3: Set up default role templates
print_step(3, "Setting up role templates...")
role_templates = create_default_role_templates(db)
# Step 4: Create admin settings
print_step(4, "Creating admin settings...")
create_admin_settings(db)
# Commit all changes
db.commit()
print_success("All changes committed")
def print_summary(db: Session):
"""Print initialization summary."""
print_header("INITIALIZATION SUMMARY")
# Count records
user_count = db.query(User).filter(User.role == "admin").count()
setting_count = db.query(AdminSetting).count()
print("\n📊 Database Status:")
print(f" Admin users: {user_count}")
print(f" Admin settings: {setting_count}")
print("\n" + "" * 70)
print("🔐 ADMIN CREDENTIALS")
print("" * 70)
print(" URL: /admin/login")
print(f" Username: {settings.admin_username}")
print(f" Password: {settings.admin_password}")
print("" * 70)
# Show security warnings if in production
if is_production():
warnings = validate_production_settings()
if warnings:
print("\n⚠️ SECURITY WARNINGS:")
for warning in warnings:
print(f" {warning}")
print("\n Please update your .env file with secure values!")
else:
print("\n⚠️ DEVELOPMENT MODE:")
print(" Default credentials are OK for development")
print(" Change them in production via .env file")
print("\n🚀 NEXT STEPS:")
print(" 1. Login to admin panel")
if is_production():
print(" 2. CHANGE DEFAULT PASSWORD IMMEDIATELY!")
print(" 3. Configure admin settings")
print(" 4. Create first vendor")
else:
print(" 2. Create demo data: make seed-demo")
print(" 3. Start development: make dev")
print("\n📝 FOR DEMO DATA (Development only):")
print(" make seed-demo")
# =============================================================================
# MAIN ENTRY POINT
# =============================================================================
def main():
"""Main entry point."""
print("\n" + "" + "" * 68 + "")
print("" + " " * 16 + "PRODUCTION INITIALIZATION" + " " * 27 + "")
print("" + "" * 68 + "")
# Show environment info
print_environment_info()
# Production safety check
if is_production():
warnings = validate_production_settings()
if warnings:
print("\n⚠️ PRODUCTION WARNINGS DETECTED:")
for warning in warnings:
print(f" {warning}")
print("\nUpdate your .env file before continuing!")
response = input("\nContinue anyway? (yes/no): ")
if response.lower() != "yes":
print("Initialization cancelled.")
sys.exit(0)
db = SessionLocal()
auth_manager = AuthManager()
try:
initialize_production(db, auth_manager)
print_summary(db)
print_header("✅ INITIALIZATION COMPLETED")
except KeyboardInterrupt:
db.rollback()
print("\n\n⚠️ Initialization interrupted")
sys.exit(1)
except Exception as e:
db.rollback()
print_header("❌ INITIALIZATION FAILED")
print(f"\nError: {e}\n")
import traceback
traceback.print_exc()
sys.exit(1)
finally:
db.close()
if __name__ == "__main__":
main()