feat: implement super admin and platform admin roles

Add multi-platform admin authorization system with:
- AdminPlatform junction table for admin-platform assignments
- is_super_admin flag on User model for global admin access
- Platform selection flow for platform admins after login
- JWT token updates to include platform context
- New API endpoints for admin user management (super admin only)
- Auth dependencies for super admin and platform access checks

Includes comprehensive test coverage:
- Unit tests for AdminPlatform model and User admin methods
- Unit tests for AdminPlatformService operations
- Integration tests for admin users API endpoints

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-24 18:44:49 +01:00
parent 7e39bb0564
commit 53e05dd497
18 changed files with 2792 additions and 6 deletions

View File

@@ -11,8 +11,8 @@ else
endif endif
# Set Python based on OS # Set Python based on OS
PYTHON := python PYTHON := python3
PIP := pip PIP := pip3
# Set PYTHONPATH for scripts # Set PYTHONPATH for scripts
export PYTHONPATH := $(shell pwd) export PYTHONPATH := $(shell pwd)

View File

@@ -0,0 +1,148 @@
"""Add admin platform roles (super admin + platform admin)
Revision ID: z9j0k1l2m3n4
Revises: z8i9j0k1l2m3
Create Date: 2026-01-24
Adds support for super admin and platform admin roles:
- is_super_admin column on users table
- admin_platforms junction table for platform admin assignments
Super admins have access to all platforms.
Platform admins are assigned to specific platforms via admin_platforms.
Existing admins are migrated to super admins for backward compatibility.
"""
from datetime import UTC, datetime
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = "z9j0k1l2m3n4"
down_revision = "z8i9j0k1l2m3"
branch_labels = None
depends_on = None
def upgrade() -> None:
# 1. Add is_super_admin column to users table
op.add_column(
"users",
sa.Column(
"is_super_admin",
sa.Boolean(),
nullable=False,
server_default="false",
comment="Whether this admin has access to all platforms (super admin)",
),
)
# 2. Create admin_platforms junction table
op.create_table(
"admin_platforms",
sa.Column("id", sa.Integer(), nullable=False),
sa.Column(
"user_id",
sa.Integer(),
nullable=False,
comment="Reference to the admin user",
),
sa.Column(
"platform_id",
sa.Integer(),
nullable=False,
comment="Reference to the platform",
),
sa.Column(
"is_active",
sa.Boolean(),
nullable=False,
server_default="true",
comment="Whether the admin assignment is active",
),
sa.Column(
"assigned_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
comment="When the admin was assigned to this platform",
),
sa.Column(
"assigned_by_user_id",
sa.Integer(),
nullable=True,
comment="Super admin who made this assignment",
),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
nullable=False,
server_default=sa.func.now(),
onupdate=sa.func.now(),
),
sa.ForeignKeyConstraint(
["user_id"],
["users.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["platform_id"],
["platforms.id"],
ondelete="CASCADE",
),
sa.ForeignKeyConstraint(
["assigned_by_user_id"],
["users.id"],
ondelete="SET NULL",
),
sa.PrimaryKeyConstraint("id"),
sa.UniqueConstraint("user_id", "platform_id", name="uq_admin_platform"),
)
# Create indexes for performance
op.create_index(
"idx_admin_platforms_user_id",
"admin_platforms",
["user_id"],
)
op.create_index(
"idx_admin_platforms_platform_id",
"admin_platforms",
["platform_id"],
)
op.create_index(
"idx_admin_platform_active",
"admin_platforms",
["user_id", "platform_id", "is_active"],
)
op.create_index(
"idx_admin_platform_user_active",
"admin_platforms",
["user_id", "is_active"],
)
# 3. Migrate existing admins to super admins for backward compatibility
# All current admins get super admin access to maintain their existing permissions
op.execute("UPDATE users SET is_super_admin = TRUE WHERE role = 'admin'")
def downgrade() -> None:
# Drop indexes
op.drop_index("idx_admin_platform_user_active", table_name="admin_platforms")
op.drop_index("idx_admin_platform_active", table_name="admin_platforms")
op.drop_index("idx_admin_platforms_platform_id", table_name="admin_platforms")
op.drop_index("idx_admin_platforms_user_id", table_name="admin_platforms")
# Drop admin_platforms table
op.drop_table("admin_platforms")
# Drop is_super_admin column
op.drop_column("users", "is_super_admin")

View File

@@ -203,6 +203,174 @@ def get_current_admin_api(
return user return user
# ============================================================================
# SUPER ADMIN AUTHENTICATION
# ============================================================================
def get_current_super_admin(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Require super admin role.
Used for: Global settings, user management, platform creation/deletion,
admin-platform assignment management.
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated super admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
if not user.is_super_admin:
logger.warning(
f"Platform admin {user.username} attempted super admin route: {request.url.path}"
)
raise AdminRequiredException("Super admin privileges required")
return user
def get_current_super_admin_api(
credentials: HTTPAuthorizationCredentials = Depends(security),
db: Session = Depends(get_db),
) -> User:
"""
Require super admin role (API header only).
Used for super admin API endpoints that should not accept cookies.
Args:
credentials: Bearer token from Authorization header
db: Database session
Returns:
User: Authenticated super admin user
Raises:
InvalidTokenException: If no token or invalid token
AdminRequiredException: If user is not admin or not super admin
"""
user = get_current_admin_api(credentials, db)
if not user.is_super_admin:
logger.warning(f"Platform admin {user.username} attempted super admin API")
raise AdminRequiredException("Super admin privileges required")
return user
def require_platform_access(platform_id: int):
"""
Dependency factory to require admin access to a specific platform.
Super admins can access all platforms.
Platform admins can only access their assigned platforms.
Usage:
@router.get("/platforms/{platform_id}/vendors")
def list_vendors(
platform_id: int,
admin: User = Depends(require_platform_access(platform_id))
):
...
"""
def _check_platform_access(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
user = get_current_admin_from_cookie_or_header(
request, credentials, admin_token, db
)
if not user.can_access_platform(platform_id):
logger.warning(
f"Admin {user.username} denied access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
f"Access denied to platform {platform_id}"
)
return user
return _check_platform_access
def get_admin_with_platform_context(
request: Request,
credentials: HTTPAuthorizationCredentials | None = Depends(security),
admin_token: str | None = Cookie(None),
db: Session = Depends(get_db),
) -> User:
"""
Get admin user and verify platform context from token.
For platform admins, extracts platform_id from JWT token and verifies access.
Stores platform in request.state.admin_platform for endpoint use.
Super admins bypass platform context check (they can access all platforms).
Args:
request: FastAPI request
credentials: Optional Bearer token from header
admin_token: Optional token from admin_token cookie
db: Database session
Returns:
User: Authenticated admin with platform context
Raises:
InvalidTokenException: If platform admin token missing platform info
InsufficientPermissionsException: If platform access revoked
"""
from models.database.platform import Platform
user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db)
# Super admins bypass platform context
if user.is_super_admin:
return user
# Platform admins need platform_id in token
if not hasattr(user, "token_platform_id"):
raise InvalidTokenException(
"Token missing platform information. Please select a platform."
)
platform_id = user.token_platform_id
# Verify admin still has access to this platform
if not user.can_access_platform(platform_id):
logger.warning(
f"Admin {user.username} lost access to platform_id={platform_id}"
)
raise InsufficientPermissionsException(
"Access to this platform has been revoked. Please login again."
)
# Load platform and store in request state
platform = db.query(Platform).filter(Platform.id == platform_id).first()
request.state.admin_platform = platform
return user
# ============================================================================ # ============================================================================
# VENDOR AUTHENTICATION # VENDOR AUTHENTICATION
# ============================================================================ # ============================================================================

View File

@@ -25,6 +25,7 @@ from fastapi import APIRouter
# Import all admin routers # Import all admin routers
from . import ( from . import (
admin_users,
audit, audit,
auth, auth,
background_tasks, background_tasks,
@@ -103,6 +104,9 @@ router.include_router(platforms.router, tags=["admin-platforms"])
# Include user management endpoints # Include user management endpoints
router.include_router(users.router, tags=["admin-users"]) router.include_router(users.router, tags=["admin-users"])
# Include admin user management endpoints (super admin only)
router.include_router(admin_users.router, tags=["admin-admin-users"])
# Include customer management endpoints # Include customer management endpoints
router.include_router(customers.router, tags=["admin-customers"]) router.include_router(customers.router, tags=["admin-customers"])

View File

@@ -0,0 +1,373 @@
# app/api/v1/admin/admin_users.py
"""
Admin user management endpoints (Super Admin only).
This module provides endpoints for:
- Listing all admin users with their platform assignments
- Creating platform admins
- Assigning/removing platform access
- Promoting/demoting super admin status
"""
import logging
from typing import Optional
from fastapi import APIRouter, Body, Depends, Path, Query
from pydantic import BaseModel, EmailStr
from sqlalchemy.orm import Session
from app.api.deps import get_current_super_admin, get_current_super_admin_api
from app.core.database import get_db
from app.services.admin_platform_service import admin_platform_service
from models.database.user import User
router = APIRouter(prefix="/admin-users")
logger = logging.getLogger(__name__)
# ============================================================================
# SCHEMAS
# ============================================================================
class PlatformAssignmentResponse(BaseModel):
"""Response for a platform assignment."""
platform_id: int
platform_code: str
platform_name: str
is_active: bool
class Config:
from_attributes = True
class AdminUserResponse(BaseModel):
"""Response for an admin user."""
id: int
email: str
username: str
first_name: Optional[str] = None
last_name: Optional[str] = None
is_active: bool
is_super_admin: bool
platform_assignments: list[PlatformAssignmentResponse] = []
class Config:
from_attributes = True
class AdminUserListResponse(BaseModel):
"""Response for listing admin users."""
admins: list[AdminUserResponse]
total: int
class CreatePlatformAdminRequest(BaseModel):
"""Request to create a new platform admin."""
email: EmailStr
username: str
password: str
first_name: Optional[str] = None
last_name: Optional[str] = None
platform_ids: list[int]
class AssignPlatformRequest(BaseModel):
"""Request to assign admin to platform."""
platform_id: int
class ToggleSuperAdminRequest(BaseModel):
"""Request to toggle super admin status."""
is_super_admin: bool
# ============================================================================
# ENDPOINTS
# ============================================================================
@router.get("", response_model=AdminUserListResponse)
def list_admin_users(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=500),
include_super_admins: bool = Query(True),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin),
):
"""
List all admin users with their platform assignments.
Super admin only.
"""
from sqlalchemy.orm import joinedload
query = db.query(User).filter(User.role == "admin")
if not include_super_admins:
query = query.filter(User.is_super_admin == False)
total = query.count()
admins = (
query.options(joinedload(User.admin_platforms))
.offset(skip)
.limit(limit)
.all()
)
admin_responses = []
for admin in admins:
assignments = []
if not admin.is_super_admin:
for ap in admin.admin_platforms:
if ap.is_active and ap.platform:
assignments.append(
PlatformAssignmentResponse(
platform_id=ap.platform_id,
platform_code=ap.platform.code,
platform_name=ap.platform.name,
is_active=ap.is_active,
)
)
admin_responses.append(
AdminUserResponse(
id=admin.id,
email=admin.email,
username=admin.username,
first_name=admin.first_name,
last_name=admin.last_name,
is_active=admin.is_active,
is_super_admin=admin.is_super_admin,
platform_assignments=assignments,
)
)
return AdminUserListResponse(admins=admin_responses, total=total)
@router.post("", response_model=AdminUserResponse)
def create_platform_admin(
request: CreatePlatformAdminRequest,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin_api),
):
"""
Create a new platform admin with platform assignments.
Super admin only.
"""
user, assignments = admin_platform_service.create_platform_admin(
db=db,
email=request.email,
username=request.username,
password=request.password,
platform_ids=request.platform_ids,
created_by_user_id=current_admin.id,
first_name=request.first_name,
last_name=request.last_name,
)
db.commit()
# Refresh to get relationships
db.refresh(user)
assignment_responses = [
PlatformAssignmentResponse(
platform_id=ap.platform_id,
platform_code=ap.platform.code if ap.platform else "",
platform_name=ap.platform.name if ap.platform else "",
is_active=ap.is_active,
)
for ap in user.admin_platforms
if ap.is_active
]
logger.info(f"Created platform admin {user.username} by {current_admin.username}")
return AdminUserResponse(
id=user.id,
email=user.email,
username=user.username,
first_name=user.first_name,
last_name=user.last_name,
is_active=user.is_active,
is_super_admin=user.is_super_admin,
platform_assignments=assignment_responses,
)
@router.get("/{user_id}", response_model=AdminUserResponse)
def get_admin_user(
user_id: int = Path(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin),
):
"""
Get admin user details with platform assignments.
Super admin only.
"""
from sqlalchemy.orm import joinedload
from app.exceptions import ValidationException
admin = (
db.query(User)
.options(joinedload(User.admin_platforms))
.filter(User.id == user_id, User.role == "admin")
.first()
)
if not admin:
raise ValidationException("Admin user not found", field="user_id")
assignments = []
if not admin.is_super_admin:
for ap in admin.admin_platforms:
if ap.is_active and ap.platform:
assignments.append(
PlatformAssignmentResponse(
platform_id=ap.platform_id,
platform_code=ap.platform.code,
platform_name=ap.platform.name,
is_active=ap.is_active,
)
)
return AdminUserResponse(
id=admin.id,
email=admin.email,
username=admin.username,
first_name=admin.first_name,
last_name=admin.last_name,
is_active=admin.is_active,
is_super_admin=admin.is_super_admin,
platform_assignments=assignments,
)
@router.post("/{user_id}/platforms/{platform_id}")
def assign_admin_to_platform(
user_id: int = Path(...),
platform_id: int = Path(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin_api),
):
"""
Assign an admin to a platform.
Super admin only.
"""
assignment = admin_platform_service.assign_admin_to_platform(
db=db,
admin_user_id=user_id,
platform_id=platform_id,
assigned_by_user_id=current_admin.id,
)
db.commit()
logger.info(
f"Assigned admin {user_id} to platform {platform_id} by {current_admin.username}"
)
return {
"message": "Admin assigned to platform successfully",
"platform_id": platform_id,
"user_id": user_id,
}
@router.delete("/{user_id}/platforms/{platform_id}")
def remove_admin_from_platform(
user_id: int = Path(...),
platform_id: int = Path(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin_api),
):
"""
Remove an admin's access to a platform.
Super admin only.
"""
admin_platform_service.remove_admin_from_platform(
db=db,
admin_user_id=user_id,
platform_id=platform_id,
removed_by_user_id=current_admin.id,
)
db.commit()
logger.info(
f"Removed admin {user_id} from platform {platform_id} by {current_admin.username}"
)
return {
"message": "Admin removed from platform successfully",
"platform_id": platform_id,
"user_id": user_id,
}
@router.put("/{user_id}/super-admin")
def toggle_super_admin_status(
user_id: int = Path(...),
request: ToggleSuperAdminRequest = Body(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin_api),
):
"""
Promote or demote an admin to/from super admin.
Super admin only.
"""
user = admin_platform_service.toggle_super_admin(
db=db,
user_id=user_id,
is_super_admin=request.is_super_admin,
current_admin_id=current_admin.id,
)
db.commit()
action = "promoted to" if request.is_super_admin else "demoted from"
logger.info(f"Admin {user.username} {action} super admin by {current_admin.username}")
return {
"message": f"Admin {action} super admin successfully",
"user_id": user_id,
"is_super_admin": user.is_super_admin,
}
@router.get("/{user_id}/platforms")
def get_admin_platforms(
user_id: int = Path(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_super_admin),
):
"""
Get all platforms assigned to an admin.
Super admin only.
"""
platforms = admin_platform_service.get_platforms_for_admin(db, user_id)
return {
"platforms": [
{
"id": p.id,
"code": p.code,
"name": p.name,
}
for p in platforms
],
"user_id": user_id,
}

View File

@@ -14,11 +14,14 @@ import logging
from fastapi import APIRouter, Depends, Response from fastapi import APIRouter, Depends, Response
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api from app.api.deps import get_current_admin_api, get_current_admin_from_cookie_or_header
from app.core.database import get_db from app.core.database import get_db
from app.core.environment import should_use_secure_cookies from app.core.environment import should_use_secure_cookies
from app.exceptions import InvalidCredentialsException from app.exceptions import InsufficientPermissionsException, InvalidCredentialsException
from app.services.admin_platform_service import admin_platform_service
from app.services.auth_service import auth_service from app.services.auth_service import auth_service
from middleware.auth import AuthManager
from models.database.platform import Platform
from models.database.user import User from models.database.user import User
from models.schema.auth import LoginResponse, LogoutResponse, UserLogin, UserResponse from models.schema.auth import LoginResponse, LogoutResponse, UserLogin, UserResponse
@@ -123,3 +126,99 @@ def admin_logout(response: Response):
logger.debug("Deleted admin_token cookies (both /admin and / paths)") logger.debug("Deleted admin_token cookies (both /admin and / paths)")
return LogoutResponse(message="Logged out successfully") return LogoutResponse(message="Logged out successfully")
@router.get("/accessible-platforms")
def get_accessible_platforms(
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_from_cookie_or_header),
):
"""
Get list of platforms this admin can access.
Returns:
- For super admins: All active platforms
- For platform admins: Only assigned platforms
"""
if current_user.is_super_admin:
platforms = db.query(Platform).filter(Platform.is_active == True).all()
else:
platforms = admin_platform_service.get_platforms_for_admin(db, current_user.id)
return {
"platforms": [
{
"id": p.id,
"code": p.code,
"name": p.name,
"logo": p.logo,
}
for p in platforms
],
"is_super_admin": current_user.is_super_admin,
"requires_platform_selection": not current_user.is_super_admin and len(platforms) > 0,
}
@router.post("/select-platform")
def select_platform(
platform_id: int,
response: Response,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_from_cookie_or_header),
):
"""
Select platform context for platform admin.
Issues a new JWT token with platform context.
Super admins skip this step (they have global access).
Args:
platform_id: Platform ID to select
Returns:
LoginResponse with new token containing platform context
"""
if current_user.is_super_admin:
raise InvalidCredentialsException(
"Super admins don't need platform selection - they have global access"
)
# Verify admin has access to this platform
if not current_user.can_access_platform(platform_id):
raise InsufficientPermissionsException(
f"You don't have access to this platform"
)
# Load platform
platform = db.query(Platform).filter(Platform.id == platform_id).first()
if not platform:
raise InvalidCredentialsException("Platform not found")
# Issue new token with platform context
auth_manager = AuthManager()
token_data = auth_manager.create_access_token(
user=current_user,
platform_id=platform.id,
platform_code=platform.code,
)
# Set cookie with new token
response.set_cookie(
key="admin_token",
value=token_data["access_token"],
httponly=True,
secure=should_use_secure_cookies(),
samesite="lax",
max_age=token_data["expires_in"],
path="/admin",
)
logger.info(f"Admin {current_user.username} selected platform {platform.code}")
return LoginResponse(
access_token=token_data["access_token"],
token_type=token_data["token_type"],
expires_in=token_data["expires_in"],
user=current_user,
)

View File

@@ -0,0 +1,389 @@
# app/services/admin_platform_service.py
"""
Admin Platform service for managing admin-platform assignments.
This module provides:
- Assigning platform admins to platforms
- Removing platform admin access
- Listing platforms for an admin
- Listing admins for a platform
- Promoting/demoting super admin status
"""
import logging
from datetime import UTC, datetime
from sqlalchemy.orm import Session, joinedload
from app.exceptions import (
AdminOperationException,
CannotModifySelfException,
ValidationException,
)
from models.database.admin_platform import AdminPlatform
from models.database.platform import Platform
from models.database.user import User
logger = logging.getLogger(__name__)
class AdminPlatformService:
"""Service class for admin-platform assignment operations."""
# ============================================================================
# ADMIN-PLATFORM ASSIGNMENTS
# ============================================================================
def assign_admin_to_platform(
self,
db: Session,
admin_user_id: int,
platform_id: int,
assigned_by_user_id: int,
) -> AdminPlatform:
"""
Assign a platform admin to a platform.
Args:
db: Database session
admin_user_id: User ID of the admin to assign
platform_id: Platform ID to assign to
assigned_by_user_id: Super admin making the assignment
Returns:
AdminPlatform: The created assignment
Raises:
ValidationException: If user is not an admin or is a super admin
AdminOperationException: If assignment already exists
"""
# Verify target user exists and is an admin
user = db.query(User).filter(User.id == admin_user_id).first()
if not user:
raise ValidationException("User not found", field="admin_user_id")
if not user.is_admin:
raise ValidationException(
"User must be an admin to be assigned to platforms",
field="admin_user_id",
)
if user.is_super_admin:
raise ValidationException(
"Super admins don't need platform assignments - they have access to all platforms",
field="admin_user_id",
)
# Verify platform exists
platform = db.query(Platform).filter(Platform.id == platform_id).first()
if not platform:
raise ValidationException("Platform not found", field="platform_id")
# Check if assignment already exists
existing = (
db.query(AdminPlatform)
.filter(
AdminPlatform.user_id == admin_user_id,
AdminPlatform.platform_id == platform_id,
)
.first()
)
if existing:
if existing.is_active:
raise AdminOperationException(
operation="assign_admin_to_platform",
reason=f"Admin already assigned to platform '{platform.code}'",
)
# Reactivate existing assignment
existing.is_active = True
existing.assigned_at = datetime.now(UTC)
existing.assigned_by_user_id = assigned_by_user_id
existing.updated_at = datetime.now(UTC)
db.flush()
db.refresh(existing)
logger.info(
f"Reactivated admin {admin_user_id} access to platform {platform.code} "
f"by admin {assigned_by_user_id}"
)
return existing
# Create new assignment
assignment = AdminPlatform(
user_id=admin_user_id,
platform_id=platform_id,
assigned_by_user_id=assigned_by_user_id,
is_active=True,
)
db.add(assignment)
db.flush()
db.refresh(assignment)
logger.info(
f"Assigned admin {admin_user_id} to platform {platform.code} "
f"by admin {assigned_by_user_id}"
)
return assignment
def remove_admin_from_platform(
self,
db: Session,
admin_user_id: int,
platform_id: int,
removed_by_user_id: int,
) -> None:
"""
Remove admin's access to a platform.
This soft-deletes by setting is_active=False for audit purposes.
Args:
db: Database session
admin_user_id: User ID of the admin to remove
platform_id: Platform ID to remove from
removed_by_user_id: Super admin making the removal
Raises:
ValidationException: If assignment doesn't exist
"""
assignment = (
db.query(AdminPlatform)
.filter(
AdminPlatform.user_id == admin_user_id,
AdminPlatform.platform_id == platform_id,
)
.first()
)
if not assignment:
raise ValidationException(
"Admin is not assigned to this platform",
field="platform_id",
)
assignment.is_active = False
assignment.updated_at = datetime.now(UTC)
db.flush()
logger.info(
f"Removed admin {admin_user_id} from platform {platform_id} "
f"by admin {removed_by_user_id}"
)
def get_platforms_for_admin(
self,
db: Session,
admin_user_id: int,
include_inactive: bool = False,
) -> list[Platform]:
"""
Get all platforms an admin can access.
Args:
db: Database session
admin_user_id: User ID of the admin
include_inactive: Whether to include inactive assignments
Returns:
List of Platform objects the admin can access
"""
query = (
db.query(Platform)
.join(AdminPlatform)
.filter(AdminPlatform.user_id == admin_user_id)
)
if not include_inactive:
query = query.filter(AdminPlatform.is_active == True)
return query.all()
def get_admins_for_platform(
self,
db: Session,
platform_id: int,
include_inactive: bool = False,
) -> list[User]:
"""
Get all admins assigned to a platform.
Args:
db: Database session
platform_id: Platform ID
include_inactive: Whether to include inactive assignments
Returns:
List of User objects assigned to the platform
"""
# Explicit join condition needed because AdminPlatform has two FKs to User
# (user_id and assigned_by_user_id)
query = (
db.query(User)
.join(AdminPlatform, AdminPlatform.user_id == User.id)
.filter(AdminPlatform.platform_id == platform_id)
)
if not include_inactive:
query = query.filter(AdminPlatform.is_active == True)
return query.all()
def get_admin_assignments(
self,
db: Session,
admin_user_id: int,
) -> list[AdminPlatform]:
"""
Get all platform assignments for an admin with platform details.
Args:
db: Database session
admin_user_id: User ID of the admin
Returns:
List of AdminPlatform objects with platform relationship loaded
"""
return (
db.query(AdminPlatform)
.options(joinedload(AdminPlatform.platform))
.filter(
AdminPlatform.user_id == admin_user_id,
AdminPlatform.is_active == True,
)
.all()
)
# ============================================================================
# SUPER ADMIN MANAGEMENT
# ============================================================================
def toggle_super_admin(
self,
db: Session,
user_id: int,
is_super_admin: bool,
current_admin_id: int,
) -> User:
"""
Promote or demote a user to/from super admin.
When demoting from super admin, the admin will have no platform access
until explicitly assigned via assign_admin_to_platform.
Args:
db: Database session
user_id: User ID to modify
is_super_admin: True to promote, False to demote
current_admin_id: Super admin making the change
Returns:
Updated User object
Raises:
CannotModifySelfException: If trying to demote self
ValidationException: If user is not an admin
"""
if user_id == current_admin_id and not is_super_admin:
raise CannotModifySelfException(
user_id=user_id,
operation="demote from super admin",
)
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise ValidationException("User not found", field="user_id")
if not user.is_admin:
raise ValidationException(
"User must be an admin to be promoted to super admin",
field="user_id",
)
old_status = user.is_super_admin
user.is_super_admin = is_super_admin
user.updated_at = datetime.now(UTC)
db.flush()
db.refresh(user)
action = "promoted to" if is_super_admin else "demoted from"
logger.info(
f"User {user.username} {action} super admin by admin {current_admin_id}"
)
return user
def create_platform_admin(
self,
db: Session,
email: str,
username: str,
password: str,
platform_ids: list[int],
created_by_user_id: int,
first_name: str | None = None,
last_name: str | None = None,
) -> tuple[User, list[AdminPlatform]]:
"""
Create a new platform admin with platform assignments.
Args:
db: Database session
email: Admin email
username: Admin username
password: Admin password
platform_ids: List of platform IDs to assign
created_by_user_id: Super admin creating the account
first_name: Optional first name
last_name: Optional last name
Returns:
Tuple of (User, list of AdminPlatform assignments)
"""
from middleware.auth import AuthManager
auth_manager = AuthManager()
# Check for existing user
existing = db.query(User).filter(
(User.email == email) | (User.username == username)
).first()
if existing:
field = "email" if existing.email == email else "username"
raise ValidationException(f"{field.capitalize()} already exists", field=field)
# Create admin user
user = User(
email=email,
username=username,
hashed_password=auth_manager.hash_password(password),
first_name=first_name,
last_name=last_name,
role="admin",
is_active=True,
is_super_admin=False, # Platform admin, not super admin
)
db.add(user)
db.flush()
# Create platform assignments
assignments = []
for platform_id in platform_ids:
assignment = AdminPlatform(
user_id=user.id,
platform_id=platform_id,
assigned_by_user_id=created_by_user_id,
is_active=True,
)
db.add(assignment)
assignments.append(assignment)
db.flush()
db.refresh(user)
logger.info(
f"Created platform admin {username} with access to platforms "
f"{platform_ids} by admin {created_by_user_id}"
)
return user, assignments
# Singleton instance
admin_platform_service = AdminPlatformService()

View File

@@ -140,6 +140,8 @@ class AuthManager:
vendor_id: int | None = None, vendor_id: int | None = None,
vendor_code: str | None = None, vendor_code: str | None = None,
vendor_role: str | None = None, vendor_role: str | None = None,
platform_id: int | None = None,
platform_code: str | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Create a JWT access token for an authenticated user. """Create a JWT access token for an authenticated user.
@@ -151,6 +153,8 @@ class AuthManager:
vendor_id (int, optional): Vendor ID if logging into vendor context vendor_id (int, optional): Vendor ID if logging into vendor context
vendor_code (str, optional): Vendor code if logging into vendor context vendor_code (str, optional): Vendor code if logging into vendor context
vendor_role (str, optional): User's role in this vendor (owner, manager, etc.) vendor_role (str, optional): User's role in this vendor (owner, manager, etc.)
platform_id (int, optional): Platform ID for platform admin context
platform_code (str, optional): Platform code for platform admin context
Returns: Returns:
Dict[str, Any]: Dictionary containing: Dict[str, Any]: Dictionary containing:
@@ -172,6 +176,21 @@ class AuthManager:
"iat": datetime.now(UTC), # Issued at time (JWT standard claim) "iat": datetime.now(UTC), # Issued at time (JWT standard claim)
} }
# Include admin-specific information for admin users
if user.is_admin:
payload["is_super_admin"] = user.is_super_admin
# For platform admins, include their accessible platform IDs
if not user.is_super_admin:
accessible = user.get_accessible_platform_ids()
if accessible is not None:
payload["accessible_platforms"] = accessible
# Include platform context for platform admins
if platform_id is not None:
payload["platform_id"] = platform_id
if platform_code is not None:
payload["platform_code"] = platform_code
# Include vendor information in token if provided (vendor-specific login) # Include vendor information in token if provided (vendor-specific login)
if vendor_id is not None: if vendor_id is not None:
payload["vendor_id"] = vendor_id payload["vendor_id"] = vendor_id
@@ -242,6 +261,18 @@ class AuthManager:
), # Default to "user" role if not specified ), # Default to "user" role if not specified
} }
# Include admin-specific information if present
if "is_super_admin" in payload:
user_data["is_super_admin"] = payload["is_super_admin"]
if "accessible_platforms" in payload:
user_data["accessible_platforms"] = payload["accessible_platforms"]
# Include platform context for platform admins
if "platform_id" in payload:
user_data["platform_id"] = payload["platform_id"]
if "platform_code" in payload:
user_data["platform_code"] = payload["platform_code"]
# Include vendor information if present in token # Include vendor information if present in token
if "vendor_id" in payload: if "vendor_id" in payload:
user_data["vendor_id"] = payload["vendor_id"] user_data["vendor_id"] = payload["vendor_id"]
@@ -302,8 +333,20 @@ class AuthManager:
if not user.is_active: if not user.is_active:
raise UserNotActiveException() raise UserNotActiveException()
# Attach vendor information to user object if present in token # Attach admin-specific information to user object if present in token
# These become dynamic attributes on the user object for this request # These become dynamic attributes on the user object for this request
if "is_super_admin" in user_data:
user.token_is_super_admin = user_data["is_super_admin"]
if "accessible_platforms" in user_data:
user.token_accessible_platforms = user_data["accessible_platforms"]
# Attach platform context to user object if present in token
if "platform_id" in user_data:
user.token_platform_id = user_data["platform_id"]
if "platform_code" in user_data:
user.token_platform_code = user_data["platform_code"]
# Attach vendor information to user object if present in token
if "vendor_id" in user_data: if "vendor_id" in user_data:
user.token_vendor_id = user_data["vendor_id"] user.token_vendor_id = user_data["vendor_id"]
if "vendor_code" in user_data: if "vendor_code" in user_data:
@@ -443,12 +486,14 @@ class AuthManager:
hashed_password = self.hash_password("admin123") hashed_password = self.hash_password("admin123")
# Create new admin user with default credentials # Create new admin user with default credentials
# Default admin is a super admin with access to all platforms
admin_user = User( admin_user = User(
email="admin@example.com", email="admin@example.com",
username="admin", username="admin",
hashed_password=hashed_password, hashed_password=hashed_password,
role="admin", role="admin",
is_active=True, is_active=True,
is_super_admin=True,
) )
# Save to database # Save to database

View File

@@ -8,6 +8,7 @@ from .admin import (
AdminSetting, AdminSetting,
PlatformAlert, PlatformAlert,
) )
from .admin_platform import AdminPlatform
from .architecture_scan import ( from .architecture_scan import (
ArchitectureScan, ArchitectureScan,
ArchitectureViolation, ArchitectureViolation,
@@ -83,6 +84,7 @@ __all__ = [
# Admin-specific models # Admin-specific models
"AdminAuditLog", "AdminAuditLog",
"AdminNotification", "AdminNotification",
"AdminPlatform",
"AdminSetting", "AdminSetting",
"PlatformAlert", "PlatformAlert",
"AdminSession", "AdminSession",

View File

@@ -0,0 +1,161 @@
# models/database/admin_platform.py
"""
AdminPlatform junction table for many-to-many relationship between Admin Users and Platforms.
This enables platform-scoped admin access:
- Super Admins: Have is_super_admin=True on User model, bypass this table
- Platform Admins: Assigned to specific platforms via this junction table
A platform admin CAN be assigned to multiple platforms (e.g., both OMS and Loyalty).
"""
from datetime import UTC, datetime
from sqlalchemy import (
Boolean,
Column,
DateTime,
ForeignKey,
Index,
Integer,
UniqueConstraint,
)
from sqlalchemy.orm import relationship
from app.core.database import Base
from models.database.base import TimestampMixin
class AdminPlatform(Base, TimestampMixin):
"""
Junction table linking admin users to platforms they can manage.
Allows a platform admin to:
- Manage specific platforms only (not all)
- Be assigned to multiple platforms
- Have assignment tracked for audit purposes
Example:
- User "john@example.com" (admin) can manage OMS platform only
- User "jane@example.com" (admin) can manage both OMS and Loyalty platforms
"""
__tablename__ = "admin_platforms"
id = Column(Integer, primary_key=True, index=True)
# ========================================================================
# Foreign Keys
# ========================================================================
user_id = Column(
Integer,
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True,
comment="Reference to the admin user",
)
platform_id = Column(
Integer,
ForeignKey("platforms.id", ondelete="CASCADE"),
nullable=False,
index=True,
comment="Reference to the platform",
)
# ========================================================================
# Assignment Status
# ========================================================================
is_active = Column(
Boolean,
default=True,
nullable=False,
comment="Whether the admin assignment is active",
)
# ========================================================================
# Audit Fields
# ========================================================================
assigned_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(UTC),
nullable=False,
comment="When the admin was assigned to this platform",
)
assigned_by_user_id = Column(
Integer,
ForeignKey("users.id", ondelete="SET NULL"),
nullable=True,
comment="Super admin who made this assignment",
)
# ========================================================================
# Relationships
# ========================================================================
user = relationship(
"User",
foreign_keys=[user_id],
back_populates="admin_platforms",
)
platform = relationship(
"Platform",
back_populates="admin_platforms",
)
assigned_by = relationship(
"User",
foreign_keys=[assigned_by_user_id],
)
# ========================================================================
# Constraints & Indexes
# ========================================================================
__table_args__ = (
# Each admin can only be assigned to a platform once
UniqueConstraint(
"user_id",
"platform_id",
name="uq_admin_platform",
),
# Performance indexes
Index(
"idx_admin_platform_active",
"user_id",
"platform_id",
"is_active",
),
Index(
"idx_admin_platform_user_active",
"user_id",
"is_active",
),
)
# ========================================================================
# Properties
# ========================================================================
@property
def platform_code(self) -> str | None:
"""Get the platform code for this assignment."""
return self.platform.code if self.platform else None
@property
def platform_name(self) -> str | None:
"""Get the platform name for this assignment."""
return self.platform.name if self.platform else None
def __repr__(self) -> str:
return (
f"<AdminPlatform("
f"user_id={self.user_id}, "
f"platform_id={self.platform_id}, "
f"is_active={self.is_active})>"
)

View File

@@ -192,6 +192,13 @@ class Platform(Base, TimestampMixin):
foreign_keys="SubscriptionTier.platform_id", foreign_keys="SubscriptionTier.platform_id",
) )
# Admin assignments for this platform
admin_platforms = relationship(
"AdminPlatform",
back_populates="platform",
cascade="all, delete-orphan",
)
# ======================================================================== # ========================================================================
# Indexes # Indexes
# ======================================================================== # ========================================================================

View File

@@ -46,6 +46,11 @@ class User(Base, TimestampMixin):
is_email_verified = Column(Boolean, default=False, nullable=False) is_email_verified = Column(Boolean, default=False, nullable=False)
last_login = Column(DateTime, nullable=True) last_login = Column(DateTime, nullable=True)
# Super admin flag (only meaningful when role='admin')
# Super admins have access to ALL platforms and global settings
# Platform admins (is_super_admin=False) are assigned to specific platforms
is_super_admin = Column(Boolean, default=False, nullable=False)
# Language preference (NULL = use context default: vendor dashboard_language or system default) # Language preference (NULL = use context default: vendor dashboard_language or system default)
# Supported: en, fr, de, lb # Supported: en, fr, de, lb
preferred_language = Column(String(5), nullable=True) preferred_language = Column(String(5), nullable=True)
@@ -59,6 +64,15 @@ class User(Base, TimestampMixin):
"VendorUser", foreign_keys="[VendorUser.user_id]", back_populates="user" "VendorUser", foreign_keys="[VendorUser.user_id]", back_populates="user"
) )
# Admin-platform assignments (for platform admins only)
# Super admins don't need assignments - they have access to all platforms
admin_platforms = relationship(
"AdminPlatform",
foreign_keys="AdminPlatform.user_id",
back_populates="user",
cascade="all, delete-orphan",
)
def __repr__(self): def __repr__(self):
"""String representation of the User object.""" """String representation of the User object."""
return f"<User(id={self.id}, username='{self.username}', email='{self.email}', role='{self.role}')>" return f"<User(id={self.id}, username='{self.username}', email='{self.email}', role='{self.role}')>"
@@ -128,3 +142,49 @@ class User(Base, TimestampMixin):
return True return True
return False return False
# =========================================================================
# Admin Platform Access Methods
# =========================================================================
@property
def is_super_admin_user(self) -> bool:
"""Check if user is a super admin (can access all platforms)."""
return self.role == UserRole.ADMIN.value and self.is_super_admin
@property
def is_platform_admin(self) -> bool:
"""Check if user is a platform admin (access to assigned platforms only)."""
return self.role == UserRole.ADMIN.value and not self.is_super_admin
def can_access_platform(self, platform_id: int) -> bool:
"""
Check if admin can access a specific platform.
- Super admins can access all platforms
- Platform admins can only access assigned platforms
- Non-admins return False
"""
if not self.is_admin:
return False
if self.is_super_admin:
return True
return any(
ap.platform_id == platform_id and ap.is_active
for ap in self.admin_platforms
)
def get_accessible_platform_ids(self) -> list[int] | None:
"""
Get list of platform IDs this admin can access.
Returns:
- None for super admins (means ALL platforms)
- List of platform IDs for platform admins
- Empty list for non-admins
"""
if not self.is_admin:
return []
if self.is_super_admin:
return None # None means ALL platforms
return [ap.platform_id for ap in self.admin_platforms if ap.is_active]

View File

@@ -158,6 +158,7 @@ def cleanup():
# Import fixtures from fixture modules # Import fixtures from fixture modules
pytest_plugins = [ pytest_plugins = [
"tests.fixtures.admin_platform_fixtures",
"tests.fixtures.auth_fixtures", "tests.fixtures.auth_fixtures",
"tests.fixtures.marketplace_product_fixtures", "tests.fixtures.marketplace_product_fixtures",
"tests.fixtures.vendor_fixtures", "tests.fixtures.vendor_fixtures",

View File

@@ -0,0 +1,150 @@
# tests/fixtures/admin_platform_fixtures.py
"""
Admin platform assignment test fixtures.
Provides fixtures for testing super admin and platform admin functionality.
"""
import uuid
import pytest
from models.database.admin_platform import AdminPlatform
from models.database.platform import Platform
from models.database.user import User
@pytest.fixture
def test_platform(db):
"""Create a test platform."""
unique_id = str(uuid.uuid4())[:8]
platform = Platform(
code=f"test_{unique_id}",
name=f"Test Platform {unique_id}",
description="A test platform for unit tests",
path_prefix=f"test{unique_id}",
is_active=True,
is_public=True,
default_language="en",
supported_languages=["en", "fr", "de"],
)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
@pytest.fixture
def another_platform(db):
"""Create another test platform."""
unique_id = str(uuid.uuid4())[:8]
platform = Platform(
code=f"another_{unique_id}",
name=f"Another Platform {unique_id}",
description="Another test platform",
path_prefix=f"another{unique_id}",
is_active=True,
is_public=True,
default_language="fr",
supported_languages=["fr", "en"],
)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
@pytest.fixture
def test_admin_platform_assignment(db, test_platform_admin, test_platform, test_super_admin):
"""Create an admin platform assignment."""
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(assignment)
return assignment
@pytest.fixture
def platform_admin_with_platform(db, auth_manager, test_platform, test_super_admin):
"""Create a platform admin with an assigned platform."""
unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("platformadminpass123")
# Create platform admin
admin = User(
email=f"padmin_{unique_id}@example.com",
username=f"padmin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True,
is_super_admin=False,
)
db.add(admin)
db.flush()
# Assign to platform
assignment = AdminPlatform(
user_id=admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def platform_admin_with_platform_headers(client, platform_admin_with_platform, test_platform):
"""Get authentication headers for platform admin with platform context."""
# First login
response = client.post(
"/api/v1/admin/auth/login",
json={
"email_or_username": platform_admin_with_platform.username,
"password": "platformadminpass123",
},
)
assert response.status_code == 200, f"Platform admin login failed: {response.text}"
token = response.json()["access_token"]
# Select platform
response = client.post(
"/api/v1/admin/auth/select-platform",
params={"platform_id": test_platform.id},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200, f"Platform selection failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def platform_factory(db):
"""Factory fixture for creating platforms."""
def _create_platform(**kwargs):
unique_id = str(uuid.uuid4())[:8]
defaults = {
"code": f"factory_{unique_id}",
"name": f"Factory Platform {unique_id}",
"path_prefix": f"factory{unique_id}",
"is_active": True,
"is_public": True,
"default_language": "en",
"supported_languages": ["en"],
}
defaults.update(kwargs)
platform = Platform(**defaults)
db.add(platform)
db.commit()
db.refresh(platform)
return platform
return _create_platform

View File

@@ -40,7 +40,7 @@ def test_user(db, auth_manager):
@pytest.fixture @pytest.fixture
def test_admin(db, auth_manager): def test_admin(db, auth_manager):
"""Create a test admin user with unique username.""" """Create a test admin user with unique username (super admin by default)."""
unique_id = str(uuid.uuid4())[:8] unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("adminpass123") hashed_password = auth_manager.hash_password("adminpass123")
admin = User( admin = User(
@@ -49,6 +49,7 @@ def test_admin(db, auth_manager):
hashed_password=hashed_password, hashed_password=hashed_password,
role="admin", role="admin",
is_active=True, is_active=True,
is_super_admin=True, # Default to super admin for backward compatibility
) )
db.add(admin) db.add(admin)
db.commit() db.commit()
@@ -56,6 +57,68 @@ def test_admin(db, auth_manager):
return admin return admin
@pytest.fixture
def test_super_admin(db, auth_manager):
"""Create a test super admin user with unique username."""
unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("superadminpass123")
admin = User(
email=f"superadmin_{unique_id}@example.com",
username=f"superadmin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True,
is_super_admin=True,
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def test_platform_admin(db, auth_manager):
"""Create a test platform admin user (not super admin)."""
unique_id = str(uuid.uuid4())[:8]
hashed_password = auth_manager.hash_password("platformadminpass123")
admin = User(
email=f"platformadmin_{unique_id}@example.com",
username=f"platformadmin_{unique_id}",
hashed_password=hashed_password,
role="admin",
is_active=True,
is_super_admin=False, # Platform admin, not super admin
)
db.add(admin)
db.commit()
db.refresh(admin)
return admin
@pytest.fixture
def super_admin_headers(client, test_super_admin):
"""Get authentication headers for super admin user."""
response = client.post(
"/api/v1/admin/auth/login",
json={"email_or_username": test_super_admin.username, "password": "superadminpass123"},
)
assert response.status_code == 200, f"Super admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture
def platform_admin_headers(client, test_platform_admin):
"""Get authentication headers for platform admin user (no platform context yet)."""
response = client.post(
"/api/v1/admin/auth/login",
json={"email_or_username": test_platform_admin.username, "password": "platformadminpass123"},
)
assert response.status_code == 200, f"Platform admin login failed: {response.text}"
token = response.json()["access_token"]
return {"Authorization": f"Bearer {token}"}
@pytest.fixture @pytest.fixture
def another_admin(db, auth_manager): def another_admin(db, auth_manager):
"""Create another test admin user for testing admin-to-admin interactions.""" """Create another test admin user for testing admin-to-admin interactions."""
@@ -67,6 +130,7 @@ def another_admin(db, auth_manager):
hashed_password=hashed_password, hashed_password=hashed_password,
role="admin", role="admin",
is_active=True, is_active=True,
is_super_admin=True, # Super admin for backward compatibility
) )
db.add(admin) db.add(admin)
db.commit() db.commit()

View File

@@ -0,0 +1,381 @@
# tests/integration/api/v1/admin/test_admin_users.py
"""
Integration tests for admin user management API endpoints.
Tests the /api/v1/admin/admin-users/* endpoints.
"""
import pytest
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminUsersListAPI:
"""Test listing admin users."""
def test_list_admin_users_as_super_admin(
self, client, super_admin_headers, test_platform_admin
):
"""Test listing admin users as super admin."""
response = client.get(
"/api/v1/admin/admin-users",
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "admins" in data
assert "total" in data
assert data["total"] >= 1
def test_list_admin_users_as_platform_admin_forbidden(
self, client, platform_admin_headers
):
"""Test that platform admin cannot list admin users."""
response = client.get(
"/api/v1/admin/admin-users",
headers=platform_admin_headers,
)
assert response.status_code == 403 # Super admin required
def test_list_admin_users_excludes_super_admins(
self, client, super_admin_headers, test_platform_admin
):
"""Test listing admin users excluding super admins."""
response = client.get(
"/api/v1/admin/admin-users",
params={"include_super_admins": False},
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
# All returned admins should not be super admins
for admin in data["admins"]:
assert admin["is_super_admin"] is False
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminUsersCreateAPI:
"""Test creating platform admins."""
def test_create_platform_admin_success(
self, client, super_admin_headers, test_platform, another_platform
):
"""Test creating a new platform admin."""
response = client.post(
"/api/v1/admin/admin-users",
json={
"email": "new_platform_admin@example.com",
"username": "new_platform_admin",
"password": "securepass123",
"first_name": "New",
"last_name": "Admin",
"platform_ids": [test_platform.id, another_platform.id],
},
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["email"] == "new_platform_admin@example.com"
assert data["username"] == "new_platform_admin"
assert data["is_super_admin"] is False
assert len(data["platform_assignments"]) == 2
def test_create_platform_admin_duplicate_email(
self, client, super_admin_headers, test_platform, test_platform_admin
):
"""Test creating platform admin with duplicate email fails."""
response = client.post(
"/api/v1/admin/admin-users",
json={
"email": test_platform_admin.email,
"username": "unique_username",
"password": "securepass123",
"platform_ids": [test_platform.id],
},
headers=super_admin_headers,
)
assert response.status_code == 422 # Validation error
def test_create_platform_admin_as_platform_admin_forbidden(
self, client, platform_admin_headers, test_platform
):
"""Test that platform admin cannot create other admins."""
response = client.post(
"/api/v1/admin/admin-users",
json={
"email": "another@example.com",
"username": "another",
"password": "securepass123",
"platform_ids": [test_platform.id],
},
headers=platform_admin_headers,
)
assert response.status_code == 403 # Forbidden for non-super admin
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminUsersPlatformAssignmentAPI:
"""Test admin platform assignment endpoints."""
def test_assign_admin_to_platform(
self, client, super_admin_headers, test_platform_admin, test_platform
):
"""Test assigning an admin to a platform."""
response = client.post(
f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms/{test_platform.id}",
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "message" in data
assert data["user_id"] == test_platform_admin.id
assert data["platform_id"] == test_platform.id
def test_remove_admin_from_platform(
self, client, super_admin_headers, db, test_platform_admin, test_platform, test_super_admin
):
"""Test removing an admin from a platform."""
from models.database.admin_platform import AdminPlatform
# First create an assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
response = client.delete(
f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms/{test_platform.id}",
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "message" in data
def test_get_admin_platforms(
self, client, super_admin_headers, db, test_platform_admin, test_platform, test_super_admin
):
"""Test getting platforms for an admin."""
from models.database.admin_platform import AdminPlatform
# Create assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
response = client.get(
f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms",
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert "platforms" in data
assert len(data["platforms"]) == 1
assert data["platforms"][0]["id"] == test_platform.id
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminUsersSuperAdminToggleAPI:
"""Test super admin promotion/demotion."""
def test_promote_to_super_admin(
self, client, super_admin_headers, test_platform_admin
):
"""Test promoting a platform admin to super admin."""
response = client.put(
f"/api/v1/admin/admin-users/{test_platform_admin.id}/super-admin",
json={"is_super_admin": True},
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["is_super_admin"] is True
def test_demote_from_super_admin(
self, client, super_admin_headers, db, auth_manager
):
"""Test demoting a super admin to platform admin."""
from models.database.user import User
# Create another super admin to demote
another_super = User(
email="demote_test@example.com",
username="demote_test",
hashed_password=auth_manager.hash_password("pass"),
role="admin",
is_active=True,
is_super_admin=True,
)
db.add(another_super)
db.commit()
response = client.put(
f"/api/v1/admin/admin-users/{another_super.id}/super-admin",
json={"is_super_admin": False},
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["is_super_admin"] is False
@pytest.mark.integration
@pytest.mark.api
@pytest.mark.admin
class TestAdminAuthPlatformSelectionAPI:
"""Test platform selection for platform admins."""
def test_get_accessible_platforms_super_admin(
self, client, super_admin_headers, test_platform
):
"""Test getting accessible platforms as super admin."""
response = client.get(
"/api/v1/admin/auth/accessible-platforms",
headers=super_admin_headers,
)
assert response.status_code == 200
data = response.json()
assert data["is_super_admin"] is True
assert "platforms" in data
assert data["requires_platform_selection"] is False
def test_get_accessible_platforms_platform_admin(
self, client, db, test_platform_admin, test_platform, test_super_admin, auth_manager
):
"""Test getting accessible platforms as platform admin."""
from models.database.admin_platform import AdminPlatform
# Create assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
# Login as platform admin
response = client.post(
"/api/v1/admin/auth/login",
json={
"email_or_username": test_platform_admin.username,
"password": "platformadminpass123",
},
)
assert response.status_code == 200
token = response.json()["access_token"]
# Get accessible platforms
response = client.get(
"/api/v1/admin/auth/accessible-platforms",
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert data["is_super_admin"] is False
assert len(data["platforms"]) == 1
assert data["platforms"][0]["id"] == test_platform.id
assert data["requires_platform_selection"] is True
def test_select_platform_success(
self, client, db, test_platform_admin, test_platform, test_super_admin
):
"""Test selecting a platform as platform admin."""
from models.database.admin_platform import AdminPlatform
# Create assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
# Login
response = client.post(
"/api/v1/admin/auth/login",
json={
"email_or_username": test_platform_admin.username,
"password": "platformadminpass123",
},
)
token = response.json()["access_token"]
# Select platform
response = client.post(
"/api/v1/admin/auth/select-platform",
params={"platform_id": test_platform.id},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
# New token should be different
assert data["access_token"] != token
def test_select_platform_not_assigned(
self, client, db, test_platform_admin, test_platform
):
"""Test selecting a platform that admin is not assigned to."""
# Login (no platform assignment exists)
response = client.post(
"/api/v1/admin/auth/login",
json={
"email_or_username": test_platform_admin.username,
"password": "platformadminpass123",
},
)
token = response.json()["access_token"]
# Try to select platform
response = client.post(
"/api/v1/admin/auth/select-platform",
params={"platform_id": test_platform.id},
headers={"Authorization": f"Bearer {token}"},
)
assert response.status_code == 403 # Access denied
def test_select_platform_as_super_admin_fails(
self, client, super_admin_headers, test_platform
):
"""Test that super admin cannot select platform (they don't need to)."""
response = client.post(
"/api/v1/admin/auth/select-platform",
params={"platform_id": test_platform.id},
headers=super_admin_headers,
)
assert response.status_code == 401 # Super admins don't need this

View File

@@ -0,0 +1,271 @@
# tests/unit/models/database/test_admin_platform.py
"""
Unit tests for AdminPlatform model.
Tests the admin-platform junction table model and its relationships.
"""
import pytest
from sqlalchemy.exc import IntegrityError
from models.database.admin_platform import AdminPlatform
@pytest.mark.unit
@pytest.mark.database
@pytest.mark.admin
class TestAdminPlatformModel:
"""Test AdminPlatform model creation and constraints."""
def test_create_admin_platform_assignment(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test creating an admin platform assignment."""
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(assignment)
assert assignment.id is not None
assert assignment.user_id == test_platform_admin.id
assert assignment.platform_id == test_platform.id
assert assignment.is_active is True
assert assignment.assigned_by_user_id == test_super_admin.id
assert assignment.assigned_at is not None
def test_admin_platform_unique_constraint(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test that an admin can only be assigned to a platform once."""
# Create first assignment
assignment1 = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment1)
db.commit()
# Try to create duplicate assignment
assignment2 = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment2)
with pytest.raises(IntegrityError):
db.commit()
def test_admin_platform_cascade_delete_user(
self, db, auth_manager, test_platform, test_super_admin
):
"""Test that deleting user cascades to admin platform assignments."""
from models.database.user import User
# Create a temporary admin
temp_admin = User(
email="temp_admin@example.com",
username="temp_admin",
hashed_password=auth_manager.hash_password("temppass"),
role="admin",
is_active=True,
is_super_admin=False,
)
db.add(temp_admin)
db.flush()
# Create assignment
assignment = AdminPlatform(
user_id=temp_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
assignment_id = assignment.id
# Delete user - should cascade to assignment
db.delete(temp_admin)
db.commit()
# Verify assignment is gone
remaining = db.query(AdminPlatform).filter(AdminPlatform.id == assignment_id).first()
assert remaining is None
def test_admin_platform_relationships(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test AdminPlatform relationships are loaded correctly."""
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(assignment)
# Test relationships
assert assignment.user is not None
assert assignment.user.id == test_platform_admin.id
assert assignment.platform is not None
assert assignment.platform.id == test_platform.id
assert assignment.assigned_by is not None
assert assignment.assigned_by.id == test_super_admin.id
def test_admin_platform_properties(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test AdminPlatform computed properties."""
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(assignment)
# Test properties
assert assignment.platform_code == test_platform.code
assert assignment.platform_name == test_platform.name
def test_admin_platform_repr(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test AdminPlatform string representation."""
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(assignment)
repr_str = repr(assignment)
assert "AdminPlatform" in repr_str
assert str(test_platform_admin.id) in repr_str
assert str(test_platform.id) in repr_str
@pytest.mark.unit
@pytest.mark.database
@pytest.mark.admin
class TestUserAdminMethods:
"""Test User model admin-related methods."""
def test_is_super_admin_user_true(self, db, test_super_admin):
"""Test is_super_admin_user property for super admin."""
assert test_super_admin.is_super_admin_user is True
def test_is_super_admin_user_false_for_platform_admin(self, db, test_platform_admin):
"""Test is_super_admin_user property for platform admin."""
assert test_platform_admin.is_super_admin_user is False
def test_is_platform_admin_true(self, db, test_platform_admin):
"""Test is_platform_admin property for platform admin."""
assert test_platform_admin.is_platform_admin is True
def test_is_platform_admin_false_for_super_admin(self, db, test_super_admin):
"""Test is_platform_admin property for super admin."""
assert test_super_admin.is_platform_admin is False
def test_can_access_platform_super_admin(self, db, test_super_admin, test_platform):
"""Test that super admin can access any platform."""
assert test_super_admin.can_access_platform(test_platform.id) is True
def test_can_access_platform_assigned(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test that platform admin can access assigned platform."""
# Create assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(test_platform_admin)
assert test_platform_admin.can_access_platform(test_platform.id) is True
def test_can_access_platform_not_assigned(
self, db, test_platform_admin, test_platform
):
"""Test that platform admin cannot access unassigned platform."""
# No assignment created
assert test_platform_admin.can_access_platform(test_platform.id) is False
def test_can_access_platform_inactive_assignment(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test that platform admin cannot access platform with inactive assignment."""
# Create inactive assignment
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=False, # Inactive
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
db.refresh(test_platform_admin)
assert test_platform_admin.can_access_platform(test_platform.id) is False
def test_get_accessible_platform_ids_super_admin(self, db, test_super_admin):
"""Test get_accessible_platform_ids returns None for super admin."""
result = test_super_admin.get_accessible_platform_ids()
assert result is None # None means all platforms
def test_get_accessible_platform_ids_platform_admin(
self, db, test_platform_admin, test_platform, another_platform, test_super_admin
):
"""Test get_accessible_platform_ids returns correct list for platform admin."""
# Create assignments for both platforms
assignment1 = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
assignment2 = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=another_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add_all([assignment1, assignment2])
db.commit()
db.refresh(test_platform_admin)
result = test_platform_admin.get_accessible_platform_ids()
assert len(result) == 2
assert test_platform.id in result
assert another_platform.id in result
def test_get_accessible_platform_ids_no_assignments(self, db, test_platform_admin):
"""Test get_accessible_platform_ids returns empty list when no assignments."""
result = test_platform_admin.get_accessible_platform_ids()
assert result == []
def test_get_accessible_platform_ids_vendor_user(self, db, test_vendor_user):
"""Test get_accessible_platform_ids returns empty list for non-admin."""
result = test_vendor_user.get_accessible_platform_ids()
assert result == []

View File

@@ -0,0 +1,463 @@
# tests/unit/services/test_admin_platform_service.py
"""
Unit tests for AdminPlatformService.
Tests the admin platform assignment service operations.
"""
import pytest
from app.exceptions import AdminOperationException, CannotModifySelfException, ValidationException
from app.services.admin_platform_service import AdminPlatformService
@pytest.mark.unit
@pytest.mark.admin
class TestAdminPlatformServiceAssign:
"""Test AdminPlatformService.assign_admin_to_platform."""
def test_assign_admin_to_platform_success(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test successfully assigning an admin to a platform."""
service = AdminPlatformService()
assignment = service.assign_admin_to_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert assignment is not None
assert assignment.user_id == test_platform_admin.id
assert assignment.platform_id == test_platform.id
assert assignment.is_active is True
assert assignment.assigned_by_user_id == test_super_admin.id
def test_assign_admin_user_not_found(self, db, test_platform, test_super_admin):
"""Test assigning non-existent user raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.assign_admin_to_platform(
db=db,
admin_user_id=99999,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert "User not found" in str(exc.value)
def test_assign_admin_not_admin_role(
self, db, test_vendor_user, test_platform, test_super_admin
):
"""Test assigning non-admin user raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.assign_admin_to_platform(
db=db,
admin_user_id=test_vendor_user.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert "must be an admin" in str(exc.value)
def test_assign_super_admin_raises_error(
self, db, test_super_admin, test_platform
):
"""Test assigning super admin raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.assign_admin_to_platform(
db=db,
admin_user_id=test_super_admin.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert "Super admins don't need platform assignments" in str(exc.value)
def test_assign_platform_not_found(
self, db, test_platform_admin, test_super_admin
):
"""Test assigning to non-existent platform raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.assign_admin_to_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=99999,
assigned_by_user_id=test_super_admin.id,
)
assert "Platform not found" in str(exc.value)
def test_assign_admin_already_assigned(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test assigning already assigned admin raises error."""
service = AdminPlatformService()
# First assignment
service.assign_admin_to_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
db.commit()
# Try to assign again
with pytest.raises(AdminOperationException) as exc:
service.assign_admin_to_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert "already assigned" in str(exc.value)
def test_reactivate_inactive_assignment(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test reactivating an inactive assignment."""
from models.database.admin_platform import AdminPlatform
service = AdminPlatformService()
# Create inactive assignment directly
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=False,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
# Assign again - should reactivate
result = service.assign_admin_to_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
assigned_by_user_id=test_super_admin.id,
)
assert result.is_active is True
@pytest.mark.unit
@pytest.mark.admin
class TestAdminPlatformServiceRemove:
"""Test AdminPlatformService.remove_admin_from_platform."""
def test_remove_admin_from_platform_success(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test successfully removing an admin from a platform."""
from models.database.admin_platform import AdminPlatform
service = AdminPlatformService()
# Create assignment first
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
# Remove
service.remove_admin_from_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
removed_by_user_id=test_super_admin.id,
)
db.commit()
db.refresh(assignment)
assert assignment.is_active is False
def test_remove_admin_not_assigned(
self, db, test_platform_admin, test_platform, test_super_admin
):
"""Test removing non-existent assignment raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.remove_admin_from_platform(
db=db,
admin_user_id=test_platform_admin.id,
platform_id=test_platform.id,
removed_by_user_id=test_super_admin.id,
)
assert "not assigned" in str(exc.value)
@pytest.mark.unit
@pytest.mark.admin
class TestAdminPlatformServiceQueries:
"""Test AdminPlatformService query methods."""
def test_get_platforms_for_admin(
self, db, test_platform_admin, test_platform, another_platform, test_super_admin
):
"""Test getting platforms for an admin."""
from models.database.admin_platform import AdminPlatform
service = AdminPlatformService()
# Create assignments
for platform in [test_platform, another_platform]:
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
platforms = service.get_platforms_for_admin(db, test_platform_admin.id)
assert len(platforms) == 2
platform_ids = [p.id for p in platforms]
assert test_platform.id in platform_ids
assert another_platform.id in platform_ids
def test_get_platforms_for_admin_no_assignments(self, db, test_platform_admin):
"""Test getting platforms when no assignments exist."""
service = AdminPlatformService()
platforms = service.get_platforms_for_admin(db, test_platform_admin.id)
assert platforms == []
def test_get_admins_for_platform(
self, db, test_platform_admin, test_platform, test_super_admin, auth_manager
):
"""Test getting admins for a platform."""
from models.database.admin_platform import AdminPlatform
from models.database.user import User
service = AdminPlatformService()
# Create another platform admin
another_admin = User(
email="another_padmin@example.com",
username="another_padmin",
hashed_password=auth_manager.hash_password("pass"),
role="admin",
is_active=True,
is_super_admin=False,
)
db.add(another_admin)
db.flush()
# Create assignments for both admins
for admin in [test_platform_admin, another_admin]:
assignment = AdminPlatform(
user_id=admin.id,
platform_id=test_platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
admins = service.get_admins_for_platform(db, test_platform.id)
assert len(admins) == 2
admin_ids = [a.id for a in admins]
assert test_platform_admin.id in admin_ids
assert another_admin.id in admin_ids
def test_get_admin_assignments(
self, db, test_platform_admin, test_platform, another_platform, test_super_admin
):
"""Test getting admin assignments with platform details."""
from models.database.admin_platform import AdminPlatform
service = AdminPlatformService()
# Create assignments
for platform in [test_platform, another_platform]:
assignment = AdminPlatform(
user_id=test_platform_admin.id,
platform_id=platform.id,
is_active=True,
assigned_by_user_id=test_super_admin.id,
)
db.add(assignment)
db.commit()
assignments = service.get_admin_assignments(db, test_platform_admin.id)
assert len(assignments) == 2
# Verify platform relationship is loaded
for assignment in assignments:
assert assignment.platform is not None
assert assignment.platform.code is not None
@pytest.mark.unit
@pytest.mark.admin
class TestAdminPlatformServiceSuperAdmin:
"""Test AdminPlatformService super admin operations."""
def test_toggle_super_admin_promote(
self, db, test_platform_admin, test_super_admin
):
"""Test promoting admin to super admin."""
service = AdminPlatformService()
result = service.toggle_super_admin(
db=db,
user_id=test_platform_admin.id,
is_super_admin=True,
current_admin_id=test_super_admin.id,
)
db.commit()
assert result.is_super_admin is True
def test_toggle_super_admin_demote(
self, db, test_super_admin, auth_manager
):
"""Test demoting super admin to platform admin."""
from models.database.user import User
service = AdminPlatformService()
# Create another super admin to demote
another_super = User(
email="another_super@example.com",
username="another_super",
hashed_password=auth_manager.hash_password("pass"),
role="admin",
is_active=True,
is_super_admin=True,
)
db.add(another_super)
db.commit()
result = service.toggle_super_admin(
db=db,
user_id=another_super.id,
is_super_admin=False,
current_admin_id=test_super_admin.id,
)
db.commit()
assert result.is_super_admin is False
def test_toggle_super_admin_cannot_demote_self(self, db, test_super_admin):
"""Test that super admin cannot demote themselves."""
service = AdminPlatformService()
with pytest.raises(CannotModifySelfException):
service.toggle_super_admin(
db=db,
user_id=test_super_admin.id,
is_super_admin=False,
current_admin_id=test_super_admin.id,
)
def test_toggle_super_admin_user_not_found(self, db, test_super_admin):
"""Test toggling non-existent user raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.toggle_super_admin(
db=db,
user_id=99999,
is_super_admin=True,
current_admin_id=test_super_admin.id,
)
assert "User not found" in str(exc.value)
def test_toggle_super_admin_not_admin(
self, db, test_vendor_user, test_super_admin
):
"""Test toggling non-admin user raises error."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.toggle_super_admin(
db=db,
user_id=test_vendor_user.id,
is_super_admin=True,
current_admin_id=test_super_admin.id,
)
assert "must be an admin" in str(exc.value)
@pytest.mark.unit
@pytest.mark.admin
class TestAdminPlatformServiceCreatePlatformAdmin:
"""Test AdminPlatformService.create_platform_admin."""
def test_create_platform_admin_success(
self, db, test_platform, another_platform, test_super_admin
):
"""Test creating a new platform admin with assignments."""
service = AdminPlatformService()
user, assignments = service.create_platform_admin(
db=db,
email="new_padmin@example.com",
username="new_padmin",
password="securepass123",
platform_ids=[test_platform.id, another_platform.id],
created_by_user_id=test_super_admin.id,
first_name="New",
last_name="Admin",
)
db.commit()
assert user is not None
assert user.email == "new_padmin@example.com"
assert user.username == "new_padmin"
assert user.role == "admin"
assert user.is_super_admin is False
assert user.first_name == "New"
assert user.last_name == "Admin"
assert len(assignments) == 2
def test_create_platform_admin_duplicate_email(
self, db, test_platform, test_super_admin, test_platform_admin
):
"""Test creating platform admin with duplicate email fails."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.create_platform_admin(
db=db,
email=test_platform_admin.email, # Duplicate
username="unique_username",
password="securepass123",
platform_ids=[test_platform.id],
created_by_user_id=test_super_admin.id,
)
assert "Email already exists" in str(exc.value)
def test_create_platform_admin_duplicate_username(
self, db, test_platform, test_super_admin, test_platform_admin
):
"""Test creating platform admin with duplicate username fails."""
service = AdminPlatformService()
with pytest.raises(ValidationException) as exc:
service.create_platform_admin(
db=db,
email="unique@example.com",
username=test_platform_admin.username, # Duplicate
password="securepass123",
platform_ids=[test_platform.id],
created_by_user_id=test_super_admin.id,
)
assert "Username already exists" in str(exc.value)