diff --git a/Makefile b/Makefile index 0de560cb..d385618c 100644 --- a/Makefile +++ b/Makefile @@ -11,8 +11,8 @@ else endif # Set Python based on OS -PYTHON := python -PIP := pip +PYTHON := python3 +PIP := pip3 # Set PYTHONPATH for scripts export PYTHONPATH := $(shell pwd) diff --git a/alembic/versions/z9j0k1l2m3n4_add_admin_platform_roles.py b/alembic/versions/z9j0k1l2m3n4_add_admin_platform_roles.py new file mode 100644 index 00000000..0f5ca272 --- /dev/null +++ b/alembic/versions/z9j0k1l2m3n4_add_admin_platform_roles.py @@ -0,0 +1,148 @@ +"""Add admin platform roles (super admin + platform admin) + +Revision ID: z9j0k1l2m3n4 +Revises: z8i9j0k1l2m3 +Create Date: 2026-01-24 + +Adds support for super admin and platform admin roles: +- is_super_admin column on users table +- admin_platforms junction table for platform admin assignments + +Super admins have access to all platforms. +Platform admins are assigned to specific platforms via admin_platforms. +Existing admins are migrated to super admins for backward compatibility. +""" + +from datetime import UTC, datetime + +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = "z9j0k1l2m3n4" +down_revision = "z8i9j0k1l2m3" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + # 1. Add is_super_admin column to users table + op.add_column( + "users", + sa.Column( + "is_super_admin", + sa.Boolean(), + nullable=False, + server_default="false", + comment="Whether this admin has access to all platforms (super admin)", + ), + ) + + # 2. Create admin_platforms junction table + op.create_table( + "admin_platforms", + sa.Column("id", sa.Integer(), nullable=False), + sa.Column( + "user_id", + sa.Integer(), + nullable=False, + comment="Reference to the admin user", + ), + sa.Column( + "platform_id", + sa.Integer(), + nullable=False, + comment="Reference to the platform", + ), + sa.Column( + "is_active", + sa.Boolean(), + nullable=False, + server_default="true", + comment="Whether the admin assignment is active", + ), + sa.Column( + "assigned_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + comment="When the admin was assigned to this platform", + ), + sa.Column( + "assigned_by_user_id", + sa.Integer(), + nullable=True, + comment="Super admin who made this assignment", + ), + sa.Column( + "created_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + ), + sa.Column( + "updated_at", + sa.DateTime(timezone=True), + nullable=False, + server_default=sa.func.now(), + onupdate=sa.func.now(), + ), + sa.ForeignKeyConstraint( + ["user_id"], + ["users.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["platform_id"], + ["platforms.id"], + ondelete="CASCADE", + ), + sa.ForeignKeyConstraint( + ["assigned_by_user_id"], + ["users.id"], + ondelete="SET NULL", + ), + sa.PrimaryKeyConstraint("id"), + sa.UniqueConstraint("user_id", "platform_id", name="uq_admin_platform"), + ) + + # Create indexes for performance + op.create_index( + "idx_admin_platforms_user_id", + "admin_platforms", + ["user_id"], + ) + op.create_index( + "idx_admin_platforms_platform_id", + "admin_platforms", + ["platform_id"], + ) + op.create_index( + "idx_admin_platform_active", + "admin_platforms", + ["user_id", "platform_id", "is_active"], + ) + op.create_index( + "idx_admin_platform_user_active", + "admin_platforms", + ["user_id", "is_active"], + ) + + # 3. Migrate existing admins to super admins for backward compatibility + # All current admins get super admin access to maintain their existing permissions + op.execute("UPDATE users SET is_super_admin = TRUE WHERE role = 'admin'") + + +def downgrade() -> None: + # Drop indexes + op.drop_index("idx_admin_platform_user_active", table_name="admin_platforms") + op.drop_index("idx_admin_platform_active", table_name="admin_platforms") + op.drop_index("idx_admin_platforms_platform_id", table_name="admin_platforms") + op.drop_index("idx_admin_platforms_user_id", table_name="admin_platforms") + + # Drop admin_platforms table + op.drop_table("admin_platforms") + + # Drop is_super_admin column + op.drop_column("users", "is_super_admin") diff --git a/app/api/deps.py b/app/api/deps.py index 60009e20..82a3ca75 100644 --- a/app/api/deps.py +++ b/app/api/deps.py @@ -203,6 +203,174 @@ def get_current_admin_api( return user +# ============================================================================ +# SUPER ADMIN AUTHENTICATION +# ============================================================================ + + +def get_current_super_admin( + request: Request, + credentials: HTTPAuthorizationCredentials | None = Depends(security), + admin_token: str | None = Cookie(None), + db: Session = Depends(get_db), +) -> User: + """ + Require super admin role. + + Used for: Global settings, user management, platform creation/deletion, + admin-platform assignment management. + + Args: + request: FastAPI request + credentials: Optional Bearer token from header + admin_token: Optional token from admin_token cookie + db: Database session + + Returns: + User: Authenticated super admin user + + Raises: + InvalidTokenException: If no token or invalid token + AdminRequiredException: If user is not admin or not super admin + """ + user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db) + + if not user.is_super_admin: + logger.warning( + f"Platform admin {user.username} attempted super admin route: {request.url.path}" + ) + raise AdminRequiredException("Super admin privileges required") + + return user + + +def get_current_super_admin_api( + credentials: HTTPAuthorizationCredentials = Depends(security), + db: Session = Depends(get_db), +) -> User: + """ + Require super admin role (API header only). + + Used for super admin API endpoints that should not accept cookies. + + Args: + credentials: Bearer token from Authorization header + db: Database session + + Returns: + User: Authenticated super admin user + + Raises: + InvalidTokenException: If no token or invalid token + AdminRequiredException: If user is not admin or not super admin + """ + user = get_current_admin_api(credentials, db) + + if not user.is_super_admin: + logger.warning(f"Platform admin {user.username} attempted super admin API") + raise AdminRequiredException("Super admin privileges required") + + return user + + +def require_platform_access(platform_id: int): + """ + Dependency factory to require admin access to a specific platform. + + Super admins can access all platforms. + Platform admins can only access their assigned platforms. + + Usage: + @router.get("/platforms/{platform_id}/vendors") + def list_vendors( + platform_id: int, + admin: User = Depends(require_platform_access(platform_id)) + ): + ... + """ + + def _check_platform_access( + request: Request, + credentials: HTTPAuthorizationCredentials | None = Depends(security), + admin_token: str | None = Cookie(None), + db: Session = Depends(get_db), + ) -> User: + user = get_current_admin_from_cookie_or_header( + request, credentials, admin_token, db + ) + + if not user.can_access_platform(platform_id): + logger.warning( + f"Admin {user.username} denied access to platform_id={platform_id}" + ) + raise InsufficientPermissionsException( + f"Access denied to platform {platform_id}" + ) + + return user + + return _check_platform_access + + +def get_admin_with_platform_context( + request: Request, + credentials: HTTPAuthorizationCredentials | None = Depends(security), + admin_token: str | None = Cookie(None), + db: Session = Depends(get_db), +) -> User: + """ + Get admin user and verify platform context from token. + + For platform admins, extracts platform_id from JWT token and verifies access. + Stores platform in request.state.admin_platform for endpoint use. + + Super admins bypass platform context check (they can access all platforms). + + Args: + request: FastAPI request + credentials: Optional Bearer token from header + admin_token: Optional token from admin_token cookie + db: Database session + + Returns: + User: Authenticated admin with platform context + + Raises: + InvalidTokenException: If platform admin token missing platform info + InsufficientPermissionsException: If platform access revoked + """ + from models.database.platform import Platform + + user = get_current_admin_from_cookie_or_header(request, credentials, admin_token, db) + + # Super admins bypass platform context + if user.is_super_admin: + return user + + # Platform admins need platform_id in token + if not hasattr(user, "token_platform_id"): + raise InvalidTokenException( + "Token missing platform information. Please select a platform." + ) + + platform_id = user.token_platform_id + + # Verify admin still has access to this platform + if not user.can_access_platform(platform_id): + logger.warning( + f"Admin {user.username} lost access to platform_id={platform_id}" + ) + raise InsufficientPermissionsException( + "Access to this platform has been revoked. Please login again." + ) + + # Load platform and store in request state + platform = db.query(Platform).filter(Platform.id == platform_id).first() + request.state.admin_platform = platform + + return user + + # ============================================================================ # VENDOR AUTHENTICATION # ============================================================================ diff --git a/app/api/v1/admin/__init__.py b/app/api/v1/admin/__init__.py index 9fa77be9..a4265938 100644 --- a/app/api/v1/admin/__init__.py +++ b/app/api/v1/admin/__init__.py @@ -25,6 +25,7 @@ from fastapi import APIRouter # Import all admin routers from . import ( + admin_users, audit, auth, background_tasks, @@ -103,6 +104,9 @@ router.include_router(platforms.router, tags=["admin-platforms"]) # Include user management endpoints router.include_router(users.router, tags=["admin-users"]) +# Include admin user management endpoints (super admin only) +router.include_router(admin_users.router, tags=["admin-admin-users"]) + # Include customer management endpoints router.include_router(customers.router, tags=["admin-customers"]) diff --git a/app/api/v1/admin/admin_users.py b/app/api/v1/admin/admin_users.py new file mode 100644 index 00000000..65048cb2 --- /dev/null +++ b/app/api/v1/admin/admin_users.py @@ -0,0 +1,373 @@ +# app/api/v1/admin/admin_users.py +""" +Admin user management endpoints (Super Admin only). + +This module provides endpoints for: +- Listing all admin users with their platform assignments +- Creating platform admins +- Assigning/removing platform access +- Promoting/demoting super admin status +""" + +import logging +from typing import Optional + +from fastapi import APIRouter, Body, Depends, Path, Query +from pydantic import BaseModel, EmailStr +from sqlalchemy.orm import Session + +from app.api.deps import get_current_super_admin, get_current_super_admin_api +from app.core.database import get_db +from app.services.admin_platform_service import admin_platform_service +from models.database.user import User + +router = APIRouter(prefix="/admin-users") +logger = logging.getLogger(__name__) + + +# ============================================================================ +# SCHEMAS +# ============================================================================ + + +class PlatformAssignmentResponse(BaseModel): + """Response for a platform assignment.""" + + platform_id: int + platform_code: str + platform_name: str + is_active: bool + + class Config: + from_attributes = True + + +class AdminUserResponse(BaseModel): + """Response for an admin user.""" + + id: int + email: str + username: str + first_name: Optional[str] = None + last_name: Optional[str] = None + is_active: bool + is_super_admin: bool + platform_assignments: list[PlatformAssignmentResponse] = [] + + class Config: + from_attributes = True + + +class AdminUserListResponse(BaseModel): + """Response for listing admin users.""" + + admins: list[AdminUserResponse] + total: int + + +class CreatePlatformAdminRequest(BaseModel): + """Request to create a new platform admin.""" + + email: EmailStr + username: str + password: str + first_name: Optional[str] = None + last_name: Optional[str] = None + platform_ids: list[int] + + +class AssignPlatformRequest(BaseModel): + """Request to assign admin to platform.""" + + platform_id: int + + +class ToggleSuperAdminRequest(BaseModel): + """Request to toggle super admin status.""" + + is_super_admin: bool + + +# ============================================================================ +# ENDPOINTS +# ============================================================================ + + +@router.get("", response_model=AdminUserListResponse) +def list_admin_users( + skip: int = Query(0, ge=0), + limit: int = Query(100, ge=1, le=500), + include_super_admins: bool = Query(True), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin), +): + """ + List all admin users with their platform assignments. + + Super admin only. + """ + from sqlalchemy.orm import joinedload + + query = db.query(User).filter(User.role == "admin") + + if not include_super_admins: + query = query.filter(User.is_super_admin == False) + + total = query.count() + + admins = ( + query.options(joinedload(User.admin_platforms)) + .offset(skip) + .limit(limit) + .all() + ) + + admin_responses = [] + for admin in admins: + assignments = [] + if not admin.is_super_admin: + for ap in admin.admin_platforms: + if ap.is_active and ap.platform: + assignments.append( + PlatformAssignmentResponse( + platform_id=ap.platform_id, + platform_code=ap.platform.code, + platform_name=ap.platform.name, + is_active=ap.is_active, + ) + ) + + admin_responses.append( + AdminUserResponse( + id=admin.id, + email=admin.email, + username=admin.username, + first_name=admin.first_name, + last_name=admin.last_name, + is_active=admin.is_active, + is_super_admin=admin.is_super_admin, + platform_assignments=assignments, + ) + ) + + return AdminUserListResponse(admins=admin_responses, total=total) + + +@router.post("", response_model=AdminUserResponse) +def create_platform_admin( + request: CreatePlatformAdminRequest, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin_api), +): + """ + Create a new platform admin with platform assignments. + + Super admin only. + """ + user, assignments = admin_platform_service.create_platform_admin( + db=db, + email=request.email, + username=request.username, + password=request.password, + platform_ids=request.platform_ids, + created_by_user_id=current_admin.id, + first_name=request.first_name, + last_name=request.last_name, + ) + + db.commit() + + # Refresh to get relationships + db.refresh(user) + + assignment_responses = [ + PlatformAssignmentResponse( + platform_id=ap.platform_id, + platform_code=ap.platform.code if ap.platform else "", + platform_name=ap.platform.name if ap.platform else "", + is_active=ap.is_active, + ) + for ap in user.admin_platforms + if ap.is_active + ] + + logger.info(f"Created platform admin {user.username} by {current_admin.username}") + + return AdminUserResponse( + id=user.id, + email=user.email, + username=user.username, + first_name=user.first_name, + last_name=user.last_name, + is_active=user.is_active, + is_super_admin=user.is_super_admin, + platform_assignments=assignment_responses, + ) + + +@router.get("/{user_id}", response_model=AdminUserResponse) +def get_admin_user( + user_id: int = Path(...), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin), +): + """ + Get admin user details with platform assignments. + + Super admin only. + """ + from sqlalchemy.orm import joinedload + + from app.exceptions import ValidationException + + admin = ( + db.query(User) + .options(joinedload(User.admin_platforms)) + .filter(User.id == user_id, User.role == "admin") + .first() + ) + + if not admin: + raise ValidationException("Admin user not found", field="user_id") + + assignments = [] + if not admin.is_super_admin: + for ap in admin.admin_platforms: + if ap.is_active and ap.platform: + assignments.append( + PlatformAssignmentResponse( + platform_id=ap.platform_id, + platform_code=ap.platform.code, + platform_name=ap.platform.name, + is_active=ap.is_active, + ) + ) + + return AdminUserResponse( + id=admin.id, + email=admin.email, + username=admin.username, + first_name=admin.first_name, + last_name=admin.last_name, + is_active=admin.is_active, + is_super_admin=admin.is_super_admin, + platform_assignments=assignments, + ) + + +@router.post("/{user_id}/platforms/{platform_id}") +def assign_admin_to_platform( + user_id: int = Path(...), + platform_id: int = Path(...), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin_api), +): + """ + Assign an admin to a platform. + + Super admin only. + """ + assignment = admin_platform_service.assign_admin_to_platform( + db=db, + admin_user_id=user_id, + platform_id=platform_id, + assigned_by_user_id=current_admin.id, + ) + db.commit() + + logger.info( + f"Assigned admin {user_id} to platform {platform_id} by {current_admin.username}" + ) + + return { + "message": "Admin assigned to platform successfully", + "platform_id": platform_id, + "user_id": user_id, + } + + +@router.delete("/{user_id}/platforms/{platform_id}") +def remove_admin_from_platform( + user_id: int = Path(...), + platform_id: int = Path(...), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin_api), +): + """ + Remove an admin's access to a platform. + + Super admin only. + """ + admin_platform_service.remove_admin_from_platform( + db=db, + admin_user_id=user_id, + platform_id=platform_id, + removed_by_user_id=current_admin.id, + ) + db.commit() + + logger.info( + f"Removed admin {user_id} from platform {platform_id} by {current_admin.username}" + ) + + return { + "message": "Admin removed from platform successfully", + "platform_id": platform_id, + "user_id": user_id, + } + + +@router.put("/{user_id}/super-admin") +def toggle_super_admin_status( + user_id: int = Path(...), + request: ToggleSuperAdminRequest = Body(...), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin_api), +): + """ + Promote or demote an admin to/from super admin. + + Super admin only. + """ + user = admin_platform_service.toggle_super_admin( + db=db, + user_id=user_id, + is_super_admin=request.is_super_admin, + current_admin_id=current_admin.id, + ) + db.commit() + + action = "promoted to" if request.is_super_admin else "demoted from" + logger.info(f"Admin {user.username} {action} super admin by {current_admin.username}") + + return { + "message": f"Admin {action} super admin successfully", + "user_id": user_id, + "is_super_admin": user.is_super_admin, + } + + +@router.get("/{user_id}/platforms") +def get_admin_platforms( + user_id: int = Path(...), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_super_admin), +): + """ + Get all platforms assigned to an admin. + + Super admin only. + """ + platforms = admin_platform_service.get_platforms_for_admin(db, user_id) + + return { + "platforms": [ + { + "id": p.id, + "code": p.code, + "name": p.name, + } + for p in platforms + ], + "user_id": user_id, + } diff --git a/app/api/v1/admin/auth.py b/app/api/v1/admin/auth.py index ad042f33..1b922dc7 100644 --- a/app/api/v1/admin/auth.py +++ b/app/api/v1/admin/auth.py @@ -14,11 +14,14 @@ import logging from fastapi import APIRouter, Depends, Response from sqlalchemy.orm import Session -from app.api.deps import get_current_admin_api +from app.api.deps import get_current_admin_api, get_current_admin_from_cookie_or_header from app.core.database import get_db from app.core.environment import should_use_secure_cookies -from app.exceptions import InvalidCredentialsException +from app.exceptions import InsufficientPermissionsException, InvalidCredentialsException +from app.services.admin_platform_service import admin_platform_service from app.services.auth_service import auth_service +from middleware.auth import AuthManager +from models.database.platform import Platform from models.database.user import User from models.schema.auth import LoginResponse, LogoutResponse, UserLogin, UserResponse @@ -123,3 +126,99 @@ def admin_logout(response: Response): logger.debug("Deleted admin_token cookies (both /admin and / paths)") return LogoutResponse(message="Logged out successfully") + + +@router.get("/accessible-platforms") +def get_accessible_platforms( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_from_cookie_or_header), +): + """ + Get list of platforms this admin can access. + + Returns: + - For super admins: All active platforms + - For platform admins: Only assigned platforms + """ + if current_user.is_super_admin: + platforms = db.query(Platform).filter(Platform.is_active == True).all() + else: + platforms = admin_platform_service.get_platforms_for_admin(db, current_user.id) + + return { + "platforms": [ + { + "id": p.id, + "code": p.code, + "name": p.name, + "logo": p.logo, + } + for p in platforms + ], + "is_super_admin": current_user.is_super_admin, + "requires_platform_selection": not current_user.is_super_admin and len(platforms) > 0, + } + + +@router.post("/select-platform") +def select_platform( + platform_id: int, + response: Response, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_admin_from_cookie_or_header), +): + """ + Select platform context for platform admin. + + Issues a new JWT token with platform context. + Super admins skip this step (they have global access). + + Args: + platform_id: Platform ID to select + + Returns: + LoginResponse with new token containing platform context + """ + if current_user.is_super_admin: + raise InvalidCredentialsException( + "Super admins don't need platform selection - they have global access" + ) + + # Verify admin has access to this platform + if not current_user.can_access_platform(platform_id): + raise InsufficientPermissionsException( + f"You don't have access to this platform" + ) + + # Load platform + platform = db.query(Platform).filter(Platform.id == platform_id).first() + if not platform: + raise InvalidCredentialsException("Platform not found") + + # Issue new token with platform context + auth_manager = AuthManager() + token_data = auth_manager.create_access_token( + user=current_user, + platform_id=platform.id, + platform_code=platform.code, + ) + + # Set cookie with new token + response.set_cookie( + key="admin_token", + value=token_data["access_token"], + httponly=True, + secure=should_use_secure_cookies(), + samesite="lax", + max_age=token_data["expires_in"], + path="/admin", + ) + + logger.info(f"Admin {current_user.username} selected platform {platform.code}") + + return LoginResponse( + access_token=token_data["access_token"], + token_type=token_data["token_type"], + expires_in=token_data["expires_in"], + user=current_user, + ) diff --git a/app/services/admin_platform_service.py b/app/services/admin_platform_service.py new file mode 100644 index 00000000..a3b6b7b1 --- /dev/null +++ b/app/services/admin_platform_service.py @@ -0,0 +1,389 @@ +# app/services/admin_platform_service.py +""" +Admin Platform service for managing admin-platform assignments. + +This module provides: +- Assigning platform admins to platforms +- Removing platform admin access +- Listing platforms for an admin +- Listing admins for a platform +- Promoting/demoting super admin status +""" + +import logging +from datetime import UTC, datetime + +from sqlalchemy.orm import Session, joinedload + +from app.exceptions import ( + AdminOperationException, + CannotModifySelfException, + ValidationException, +) +from models.database.admin_platform import AdminPlatform +from models.database.platform import Platform +from models.database.user import User + +logger = logging.getLogger(__name__) + + +class AdminPlatformService: + """Service class for admin-platform assignment operations.""" + + # ============================================================================ + # ADMIN-PLATFORM ASSIGNMENTS + # ============================================================================ + + def assign_admin_to_platform( + self, + db: Session, + admin_user_id: int, + platform_id: int, + assigned_by_user_id: int, + ) -> AdminPlatform: + """ + Assign a platform admin to a platform. + + Args: + db: Database session + admin_user_id: User ID of the admin to assign + platform_id: Platform ID to assign to + assigned_by_user_id: Super admin making the assignment + + Returns: + AdminPlatform: The created assignment + + Raises: + ValidationException: If user is not an admin or is a super admin + AdminOperationException: If assignment already exists + """ + # Verify target user exists and is an admin + user = db.query(User).filter(User.id == admin_user_id).first() + if not user: + raise ValidationException("User not found", field="admin_user_id") + if not user.is_admin: + raise ValidationException( + "User must be an admin to be assigned to platforms", + field="admin_user_id", + ) + if user.is_super_admin: + raise ValidationException( + "Super admins don't need platform assignments - they have access to all platforms", + field="admin_user_id", + ) + + # Verify platform exists + platform = db.query(Platform).filter(Platform.id == platform_id).first() + if not platform: + raise ValidationException("Platform not found", field="platform_id") + + # Check if assignment already exists + existing = ( + db.query(AdminPlatform) + .filter( + AdminPlatform.user_id == admin_user_id, + AdminPlatform.platform_id == platform_id, + ) + .first() + ) + if existing: + if existing.is_active: + raise AdminOperationException( + operation="assign_admin_to_platform", + reason=f"Admin already assigned to platform '{platform.code}'", + ) + # Reactivate existing assignment + existing.is_active = True + existing.assigned_at = datetime.now(UTC) + existing.assigned_by_user_id = assigned_by_user_id + existing.updated_at = datetime.now(UTC) + db.flush() + db.refresh(existing) + logger.info( + f"Reactivated admin {admin_user_id} access to platform {platform.code} " + f"by admin {assigned_by_user_id}" + ) + return existing + + # Create new assignment + assignment = AdminPlatform( + user_id=admin_user_id, + platform_id=platform_id, + assigned_by_user_id=assigned_by_user_id, + is_active=True, + ) + db.add(assignment) + db.flush() + db.refresh(assignment) + + logger.info( + f"Assigned admin {admin_user_id} to platform {platform.code} " + f"by admin {assigned_by_user_id}" + ) + + return assignment + + def remove_admin_from_platform( + self, + db: Session, + admin_user_id: int, + platform_id: int, + removed_by_user_id: int, + ) -> None: + """ + Remove admin's access to a platform. + + This soft-deletes by setting is_active=False for audit purposes. + + Args: + db: Database session + admin_user_id: User ID of the admin to remove + platform_id: Platform ID to remove from + removed_by_user_id: Super admin making the removal + + Raises: + ValidationException: If assignment doesn't exist + """ + assignment = ( + db.query(AdminPlatform) + .filter( + AdminPlatform.user_id == admin_user_id, + AdminPlatform.platform_id == platform_id, + ) + .first() + ) + + if not assignment: + raise ValidationException( + "Admin is not assigned to this platform", + field="platform_id", + ) + + assignment.is_active = False + assignment.updated_at = datetime.now(UTC) + db.flush() + + logger.info( + f"Removed admin {admin_user_id} from platform {platform_id} " + f"by admin {removed_by_user_id}" + ) + + def get_platforms_for_admin( + self, + db: Session, + admin_user_id: int, + include_inactive: bool = False, + ) -> list[Platform]: + """ + Get all platforms an admin can access. + + Args: + db: Database session + admin_user_id: User ID of the admin + include_inactive: Whether to include inactive assignments + + Returns: + List of Platform objects the admin can access + """ + query = ( + db.query(Platform) + .join(AdminPlatform) + .filter(AdminPlatform.user_id == admin_user_id) + ) + + if not include_inactive: + query = query.filter(AdminPlatform.is_active == True) + + return query.all() + + def get_admins_for_platform( + self, + db: Session, + platform_id: int, + include_inactive: bool = False, + ) -> list[User]: + """ + Get all admins assigned to a platform. + + Args: + db: Database session + platform_id: Platform ID + include_inactive: Whether to include inactive assignments + + Returns: + List of User objects assigned to the platform + """ + # Explicit join condition needed because AdminPlatform has two FKs to User + # (user_id and assigned_by_user_id) + query = ( + db.query(User) + .join(AdminPlatform, AdminPlatform.user_id == User.id) + .filter(AdminPlatform.platform_id == platform_id) + ) + + if not include_inactive: + query = query.filter(AdminPlatform.is_active == True) + + return query.all() + + def get_admin_assignments( + self, + db: Session, + admin_user_id: int, + ) -> list[AdminPlatform]: + """ + Get all platform assignments for an admin with platform details. + + Args: + db: Database session + admin_user_id: User ID of the admin + + Returns: + List of AdminPlatform objects with platform relationship loaded + """ + return ( + db.query(AdminPlatform) + .options(joinedload(AdminPlatform.platform)) + .filter( + AdminPlatform.user_id == admin_user_id, + AdminPlatform.is_active == True, + ) + .all() + ) + + # ============================================================================ + # SUPER ADMIN MANAGEMENT + # ============================================================================ + + def toggle_super_admin( + self, + db: Session, + user_id: int, + is_super_admin: bool, + current_admin_id: int, + ) -> User: + """ + Promote or demote a user to/from super admin. + + When demoting from super admin, the admin will have no platform access + until explicitly assigned via assign_admin_to_platform. + + Args: + db: Database session + user_id: User ID to modify + is_super_admin: True to promote, False to demote + current_admin_id: Super admin making the change + + Returns: + Updated User object + + Raises: + CannotModifySelfException: If trying to demote self + ValidationException: If user is not an admin + """ + if user_id == current_admin_id and not is_super_admin: + raise CannotModifySelfException( + user_id=user_id, + operation="demote from super admin", + ) + + user = db.query(User).filter(User.id == user_id).first() + if not user: + raise ValidationException("User not found", field="user_id") + if not user.is_admin: + raise ValidationException( + "User must be an admin to be promoted to super admin", + field="user_id", + ) + + old_status = user.is_super_admin + user.is_super_admin = is_super_admin + user.updated_at = datetime.now(UTC) + db.flush() + db.refresh(user) + + action = "promoted to" if is_super_admin else "demoted from" + logger.info( + f"User {user.username} {action} super admin by admin {current_admin_id}" + ) + + return user + + def create_platform_admin( + self, + db: Session, + email: str, + username: str, + password: str, + platform_ids: list[int], + created_by_user_id: int, + first_name: str | None = None, + last_name: str | None = None, + ) -> tuple[User, list[AdminPlatform]]: + """ + Create a new platform admin with platform assignments. + + Args: + db: Database session + email: Admin email + username: Admin username + password: Admin password + platform_ids: List of platform IDs to assign + created_by_user_id: Super admin creating the account + first_name: Optional first name + last_name: Optional last name + + Returns: + Tuple of (User, list of AdminPlatform assignments) + """ + from middleware.auth import AuthManager + + auth_manager = AuthManager() + + # Check for existing user + existing = db.query(User).filter( + (User.email == email) | (User.username == username) + ).first() + if existing: + field = "email" if existing.email == email else "username" + raise ValidationException(f"{field.capitalize()} already exists", field=field) + + # Create admin user + user = User( + email=email, + username=username, + hashed_password=auth_manager.hash_password(password), + first_name=first_name, + last_name=last_name, + role="admin", + is_active=True, + is_super_admin=False, # Platform admin, not super admin + ) + db.add(user) + db.flush() + + # Create platform assignments + assignments = [] + for platform_id in platform_ids: + assignment = AdminPlatform( + user_id=user.id, + platform_id=platform_id, + assigned_by_user_id=created_by_user_id, + is_active=True, + ) + db.add(assignment) + assignments.append(assignment) + + db.flush() + db.refresh(user) + + logger.info( + f"Created platform admin {username} with access to platforms " + f"{platform_ids} by admin {created_by_user_id}" + ) + + return user, assignments + + +# Singleton instance +admin_platform_service = AdminPlatformService() diff --git a/middleware/auth.py b/middleware/auth.py index c34426eb..77349443 100644 --- a/middleware/auth.py +++ b/middleware/auth.py @@ -140,6 +140,8 @@ class AuthManager: vendor_id: int | None = None, vendor_code: str | None = None, vendor_role: str | None = None, + platform_id: int | None = None, + platform_code: str | None = None, ) -> dict[str, Any]: """Create a JWT access token for an authenticated user. @@ -151,6 +153,8 @@ class AuthManager: vendor_id (int, optional): Vendor ID if logging into vendor context vendor_code (str, optional): Vendor code if logging into vendor context vendor_role (str, optional): User's role in this vendor (owner, manager, etc.) + platform_id (int, optional): Platform ID for platform admin context + platform_code (str, optional): Platform code for platform admin context Returns: Dict[str, Any]: Dictionary containing: @@ -172,6 +176,21 @@ class AuthManager: "iat": datetime.now(UTC), # Issued at time (JWT standard claim) } + # Include admin-specific information for admin users + if user.is_admin: + payload["is_super_admin"] = user.is_super_admin + # For platform admins, include their accessible platform IDs + if not user.is_super_admin: + accessible = user.get_accessible_platform_ids() + if accessible is not None: + payload["accessible_platforms"] = accessible + + # Include platform context for platform admins + if platform_id is not None: + payload["platform_id"] = platform_id + if platform_code is not None: + payload["platform_code"] = platform_code + # Include vendor information in token if provided (vendor-specific login) if vendor_id is not None: payload["vendor_id"] = vendor_id @@ -242,6 +261,18 @@ class AuthManager: ), # Default to "user" role if not specified } + # Include admin-specific information if present + if "is_super_admin" in payload: + user_data["is_super_admin"] = payload["is_super_admin"] + if "accessible_platforms" in payload: + user_data["accessible_platforms"] = payload["accessible_platforms"] + + # Include platform context for platform admins + if "platform_id" in payload: + user_data["platform_id"] = payload["platform_id"] + if "platform_code" in payload: + user_data["platform_code"] = payload["platform_code"] + # Include vendor information if present in token if "vendor_id" in payload: user_data["vendor_id"] = payload["vendor_id"] @@ -302,8 +333,20 @@ class AuthManager: if not user.is_active: raise UserNotActiveException() - # Attach vendor information to user object if present in token + # Attach admin-specific information to user object if present in token # These become dynamic attributes on the user object for this request + if "is_super_admin" in user_data: + user.token_is_super_admin = user_data["is_super_admin"] + if "accessible_platforms" in user_data: + user.token_accessible_platforms = user_data["accessible_platforms"] + + # Attach platform context to user object if present in token + if "platform_id" in user_data: + user.token_platform_id = user_data["platform_id"] + if "platform_code" in user_data: + user.token_platform_code = user_data["platform_code"] + + # Attach vendor information to user object if present in token if "vendor_id" in user_data: user.token_vendor_id = user_data["vendor_id"] if "vendor_code" in user_data: @@ -443,12 +486,14 @@ class AuthManager: hashed_password = self.hash_password("admin123") # Create new admin user with default credentials + # Default admin is a super admin with access to all platforms admin_user = User( email="admin@example.com", username="admin", hashed_password=hashed_password, role="admin", is_active=True, + is_super_admin=True, ) # Save to database diff --git a/models/database/__init__.py b/models/database/__init__.py index 0423cf04..fd2ba3ac 100644 --- a/models/database/__init__.py +++ b/models/database/__init__.py @@ -8,6 +8,7 @@ from .admin import ( AdminSetting, PlatformAlert, ) +from .admin_platform import AdminPlatform from .architecture_scan import ( ArchitectureScan, ArchitectureViolation, @@ -83,6 +84,7 @@ __all__ = [ # Admin-specific models "AdminAuditLog", "AdminNotification", + "AdminPlatform", "AdminSetting", "PlatformAlert", "AdminSession", diff --git a/models/database/admin_platform.py b/models/database/admin_platform.py new file mode 100644 index 00000000..c208f166 --- /dev/null +++ b/models/database/admin_platform.py @@ -0,0 +1,161 @@ +# models/database/admin_platform.py +""" +AdminPlatform junction table for many-to-many relationship between Admin Users and Platforms. + +This enables platform-scoped admin access: +- Super Admins: Have is_super_admin=True on User model, bypass this table +- Platform Admins: Assigned to specific platforms via this junction table + +A platform admin CAN be assigned to multiple platforms (e.g., both OMS and Loyalty). +""" + +from datetime import UTC, datetime + +from sqlalchemy import ( + Boolean, + Column, + DateTime, + ForeignKey, + Index, + Integer, + UniqueConstraint, +) +from sqlalchemy.orm import relationship + +from app.core.database import Base +from models.database.base import TimestampMixin + + +class AdminPlatform(Base, TimestampMixin): + """ + Junction table linking admin users to platforms they can manage. + + Allows a platform admin to: + - Manage specific platforms only (not all) + - Be assigned to multiple platforms + - Have assignment tracked for audit purposes + + Example: + - User "john@example.com" (admin) can manage OMS platform only + - User "jane@example.com" (admin) can manage both OMS and Loyalty platforms + """ + + __tablename__ = "admin_platforms" + + id = Column(Integer, primary_key=True, index=True) + + # ======================================================================== + # Foreign Keys + # ======================================================================== + + user_id = Column( + Integer, + ForeignKey("users.id", ondelete="CASCADE"), + nullable=False, + index=True, + comment="Reference to the admin user", + ) + + platform_id = Column( + Integer, + ForeignKey("platforms.id", ondelete="CASCADE"), + nullable=False, + index=True, + comment="Reference to the platform", + ) + + # ======================================================================== + # Assignment Status + # ======================================================================== + + is_active = Column( + Boolean, + default=True, + nullable=False, + comment="Whether the admin assignment is active", + ) + + # ======================================================================== + # Audit Fields + # ======================================================================== + + assigned_at = Column( + DateTime(timezone=True), + default=lambda: datetime.now(UTC), + nullable=False, + comment="When the admin was assigned to this platform", + ) + + assigned_by_user_id = Column( + Integer, + ForeignKey("users.id", ondelete="SET NULL"), + nullable=True, + comment="Super admin who made this assignment", + ) + + # ======================================================================== + # Relationships + # ======================================================================== + + user = relationship( + "User", + foreign_keys=[user_id], + back_populates="admin_platforms", + ) + + platform = relationship( + "Platform", + back_populates="admin_platforms", + ) + + assigned_by = relationship( + "User", + foreign_keys=[assigned_by_user_id], + ) + + # ======================================================================== + # Constraints & Indexes + # ======================================================================== + + __table_args__ = ( + # Each admin can only be assigned to a platform once + UniqueConstraint( + "user_id", + "platform_id", + name="uq_admin_platform", + ), + # Performance indexes + Index( + "idx_admin_platform_active", + "user_id", + "platform_id", + "is_active", + ), + Index( + "idx_admin_platform_user_active", + "user_id", + "is_active", + ), + ) + + # ======================================================================== + # Properties + # ======================================================================== + + @property + def platform_code(self) -> str | None: + """Get the platform code for this assignment.""" + return self.platform.code if self.platform else None + + @property + def platform_name(self) -> str | None: + """Get the platform name for this assignment.""" + return self.platform.name if self.platform else None + + def __repr__(self) -> str: + return ( + f"" + ) diff --git a/models/database/platform.py b/models/database/platform.py index 4b852a41..301c9c46 100644 --- a/models/database/platform.py +++ b/models/database/platform.py @@ -192,6 +192,13 @@ class Platform(Base, TimestampMixin): foreign_keys="SubscriptionTier.platform_id", ) + # Admin assignments for this platform + admin_platforms = relationship( + "AdminPlatform", + back_populates="platform", + cascade="all, delete-orphan", + ) + # ======================================================================== # Indexes # ======================================================================== diff --git a/models/database/user.py b/models/database/user.py index 0ff1cba0..dddd065d 100644 --- a/models/database/user.py +++ b/models/database/user.py @@ -46,6 +46,11 @@ class User(Base, TimestampMixin): is_email_verified = Column(Boolean, default=False, nullable=False) last_login = Column(DateTime, nullable=True) + # Super admin flag (only meaningful when role='admin') + # Super admins have access to ALL platforms and global settings + # Platform admins (is_super_admin=False) are assigned to specific platforms + is_super_admin = Column(Boolean, default=False, nullable=False) + # Language preference (NULL = use context default: vendor dashboard_language or system default) # Supported: en, fr, de, lb preferred_language = Column(String(5), nullable=True) @@ -59,6 +64,15 @@ class User(Base, TimestampMixin): "VendorUser", foreign_keys="[VendorUser.user_id]", back_populates="user" ) + # Admin-platform assignments (for platform admins only) + # Super admins don't need assignments - they have access to all platforms + admin_platforms = relationship( + "AdminPlatform", + foreign_keys="AdminPlatform.user_id", + back_populates="user", + cascade="all, delete-orphan", + ) + def __repr__(self): """String representation of the User object.""" return f"" @@ -128,3 +142,49 @@ class User(Base, TimestampMixin): return True return False + + # ========================================================================= + # Admin Platform Access Methods + # ========================================================================= + + @property + def is_super_admin_user(self) -> bool: + """Check if user is a super admin (can access all platforms).""" + return self.role == UserRole.ADMIN.value and self.is_super_admin + + @property + def is_platform_admin(self) -> bool: + """Check if user is a platform admin (access to assigned platforms only).""" + return self.role == UserRole.ADMIN.value and not self.is_super_admin + + def can_access_platform(self, platform_id: int) -> bool: + """ + Check if admin can access a specific platform. + + - Super admins can access all platforms + - Platform admins can only access assigned platforms + - Non-admins return False + """ + if not self.is_admin: + return False + if self.is_super_admin: + return True + return any( + ap.platform_id == platform_id and ap.is_active + for ap in self.admin_platforms + ) + + def get_accessible_platform_ids(self) -> list[int] | None: + """ + Get list of platform IDs this admin can access. + + Returns: + - None for super admins (means ALL platforms) + - List of platform IDs for platform admins + - Empty list for non-admins + """ + if not self.is_admin: + return [] + if self.is_super_admin: + return None # None means ALL platforms + return [ap.platform_id for ap in self.admin_platforms if ap.is_active] diff --git a/tests/conftest.py b/tests/conftest.py index 4a6777a4..35d272bf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -158,6 +158,7 @@ def cleanup(): # Import fixtures from fixture modules pytest_plugins = [ + "tests.fixtures.admin_platform_fixtures", "tests.fixtures.auth_fixtures", "tests.fixtures.marketplace_product_fixtures", "tests.fixtures.vendor_fixtures", diff --git a/tests/fixtures/admin_platform_fixtures.py b/tests/fixtures/admin_platform_fixtures.py new file mode 100644 index 00000000..d7077dcb --- /dev/null +++ b/tests/fixtures/admin_platform_fixtures.py @@ -0,0 +1,150 @@ +# tests/fixtures/admin_platform_fixtures.py +""" +Admin platform assignment test fixtures. + +Provides fixtures for testing super admin and platform admin functionality. +""" + +import uuid + +import pytest + +from models.database.admin_platform import AdminPlatform +from models.database.platform import Platform +from models.database.user import User + + +@pytest.fixture +def test_platform(db): + """Create a test platform.""" + unique_id = str(uuid.uuid4())[:8] + platform = Platform( + code=f"test_{unique_id}", + name=f"Test Platform {unique_id}", + description="A test platform for unit tests", + path_prefix=f"test{unique_id}", + is_active=True, + is_public=True, + default_language="en", + supported_languages=["en", "fr", "de"], + ) + db.add(platform) + db.commit() + db.refresh(platform) + return platform + + +@pytest.fixture +def another_platform(db): + """Create another test platform.""" + unique_id = str(uuid.uuid4())[:8] + platform = Platform( + code=f"another_{unique_id}", + name=f"Another Platform {unique_id}", + description="Another test platform", + path_prefix=f"another{unique_id}", + is_active=True, + is_public=True, + default_language="fr", + supported_languages=["fr", "en"], + ) + db.add(platform) + db.commit() + db.refresh(platform) + return platform + + +@pytest.fixture +def test_admin_platform_assignment(db, test_platform_admin, test_platform, test_super_admin): + """Create an admin platform assignment.""" + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(assignment) + return assignment + + +@pytest.fixture +def platform_admin_with_platform(db, auth_manager, test_platform, test_super_admin): + """Create a platform admin with an assigned platform.""" + unique_id = str(uuid.uuid4())[:8] + hashed_password = auth_manager.hash_password("platformadminpass123") + + # Create platform admin + admin = User( + email=f"padmin_{unique_id}@example.com", + username=f"padmin_{unique_id}", + hashed_password=hashed_password, + role="admin", + is_active=True, + is_super_admin=False, + ) + db.add(admin) + db.flush() + + # Assign to platform + assignment = AdminPlatform( + user_id=admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(admin) + + return admin + + +@pytest.fixture +def platform_admin_with_platform_headers(client, platform_admin_with_platform, test_platform): + """Get authentication headers for platform admin with platform context.""" + # First login + response = client.post( + "/api/v1/admin/auth/login", + json={ + "email_or_username": platform_admin_with_platform.username, + "password": "platformadminpass123", + }, + ) + assert response.status_code == 200, f"Platform admin login failed: {response.text}" + token = response.json()["access_token"] + + # Select platform + response = client.post( + "/api/v1/admin/auth/select-platform", + params={"platform_id": test_platform.id}, + headers={"Authorization": f"Bearer {token}"}, + ) + assert response.status_code == 200, f"Platform selection failed: {response.text}" + token = response.json()["access_token"] + + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def platform_factory(db): + """Factory fixture for creating platforms.""" + def _create_platform(**kwargs): + unique_id = str(uuid.uuid4())[:8] + defaults = { + "code": f"factory_{unique_id}", + "name": f"Factory Platform {unique_id}", + "path_prefix": f"factory{unique_id}", + "is_active": True, + "is_public": True, + "default_language": "en", + "supported_languages": ["en"], + } + defaults.update(kwargs) + platform = Platform(**defaults) + db.add(platform) + db.commit() + db.refresh(platform) + return platform + return _create_platform diff --git a/tests/fixtures/auth_fixtures.py b/tests/fixtures/auth_fixtures.py index ddc7f804..f8901242 100644 --- a/tests/fixtures/auth_fixtures.py +++ b/tests/fixtures/auth_fixtures.py @@ -40,7 +40,7 @@ def test_user(db, auth_manager): @pytest.fixture def test_admin(db, auth_manager): - """Create a test admin user with unique username.""" + """Create a test admin user with unique username (super admin by default).""" unique_id = str(uuid.uuid4())[:8] hashed_password = auth_manager.hash_password("adminpass123") admin = User( @@ -49,6 +49,7 @@ def test_admin(db, auth_manager): hashed_password=hashed_password, role="admin", is_active=True, + is_super_admin=True, # Default to super admin for backward compatibility ) db.add(admin) db.commit() @@ -56,6 +57,68 @@ def test_admin(db, auth_manager): return admin +@pytest.fixture +def test_super_admin(db, auth_manager): + """Create a test super admin user with unique username.""" + unique_id = str(uuid.uuid4())[:8] + hashed_password = auth_manager.hash_password("superadminpass123") + admin = User( + email=f"superadmin_{unique_id}@example.com", + username=f"superadmin_{unique_id}", + hashed_password=hashed_password, + role="admin", + is_active=True, + is_super_admin=True, + ) + db.add(admin) + db.commit() + db.refresh(admin) + return admin + + +@pytest.fixture +def test_platform_admin(db, auth_manager): + """Create a test platform admin user (not super admin).""" + unique_id = str(uuid.uuid4())[:8] + hashed_password = auth_manager.hash_password("platformadminpass123") + admin = User( + email=f"platformadmin_{unique_id}@example.com", + username=f"platformadmin_{unique_id}", + hashed_password=hashed_password, + role="admin", + is_active=True, + is_super_admin=False, # Platform admin, not super admin + ) + db.add(admin) + db.commit() + db.refresh(admin) + return admin + + +@pytest.fixture +def super_admin_headers(client, test_super_admin): + """Get authentication headers for super admin user.""" + response = client.post( + "/api/v1/admin/auth/login", + json={"email_or_username": test_super_admin.username, "password": "superadminpass123"}, + ) + assert response.status_code == 200, f"Super admin login failed: {response.text}" + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + +@pytest.fixture +def platform_admin_headers(client, test_platform_admin): + """Get authentication headers for platform admin user (no platform context yet).""" + response = client.post( + "/api/v1/admin/auth/login", + json={"email_or_username": test_platform_admin.username, "password": "platformadminpass123"}, + ) + assert response.status_code == 200, f"Platform admin login failed: {response.text}" + token = response.json()["access_token"] + return {"Authorization": f"Bearer {token}"} + + @pytest.fixture def another_admin(db, auth_manager): """Create another test admin user for testing admin-to-admin interactions.""" @@ -67,6 +130,7 @@ def another_admin(db, auth_manager): hashed_password=hashed_password, role="admin", is_active=True, + is_super_admin=True, # Super admin for backward compatibility ) db.add(admin) db.commit() diff --git a/tests/integration/api/v1/admin/test_admin_users.py b/tests/integration/api/v1/admin/test_admin_users.py new file mode 100644 index 00000000..d553f723 --- /dev/null +++ b/tests/integration/api/v1/admin/test_admin_users.py @@ -0,0 +1,381 @@ +# tests/integration/api/v1/admin/test_admin_users.py +""" +Integration tests for admin user management API endpoints. + +Tests the /api/v1/admin/admin-users/* endpoints. +""" + +import pytest + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminUsersListAPI: + """Test listing admin users.""" + + def test_list_admin_users_as_super_admin( + self, client, super_admin_headers, test_platform_admin + ): + """Test listing admin users as super admin.""" + response = client.get( + "/api/v1/admin/admin-users", + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert "admins" in data + assert "total" in data + assert data["total"] >= 1 + + def test_list_admin_users_as_platform_admin_forbidden( + self, client, platform_admin_headers + ): + """Test that platform admin cannot list admin users.""" + response = client.get( + "/api/v1/admin/admin-users", + headers=platform_admin_headers, + ) + + assert response.status_code == 403 # Super admin required + + def test_list_admin_users_excludes_super_admins( + self, client, super_admin_headers, test_platform_admin + ): + """Test listing admin users excluding super admins.""" + response = client.get( + "/api/v1/admin/admin-users", + params={"include_super_admins": False}, + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + # All returned admins should not be super admins + for admin in data["admins"]: + assert admin["is_super_admin"] is False + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminUsersCreateAPI: + """Test creating platform admins.""" + + def test_create_platform_admin_success( + self, client, super_admin_headers, test_platform, another_platform + ): + """Test creating a new platform admin.""" + response = client.post( + "/api/v1/admin/admin-users", + json={ + "email": "new_platform_admin@example.com", + "username": "new_platform_admin", + "password": "securepass123", + "first_name": "New", + "last_name": "Admin", + "platform_ids": [test_platform.id, another_platform.id], + }, + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert data["email"] == "new_platform_admin@example.com" + assert data["username"] == "new_platform_admin" + assert data["is_super_admin"] is False + assert len(data["platform_assignments"]) == 2 + + def test_create_platform_admin_duplicate_email( + self, client, super_admin_headers, test_platform, test_platform_admin + ): + """Test creating platform admin with duplicate email fails.""" + response = client.post( + "/api/v1/admin/admin-users", + json={ + "email": test_platform_admin.email, + "username": "unique_username", + "password": "securepass123", + "platform_ids": [test_platform.id], + }, + headers=super_admin_headers, + ) + + assert response.status_code == 422 # Validation error + + def test_create_platform_admin_as_platform_admin_forbidden( + self, client, platform_admin_headers, test_platform + ): + """Test that platform admin cannot create other admins.""" + response = client.post( + "/api/v1/admin/admin-users", + json={ + "email": "another@example.com", + "username": "another", + "password": "securepass123", + "platform_ids": [test_platform.id], + }, + headers=platform_admin_headers, + ) + + assert response.status_code == 403 # Forbidden for non-super admin + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminUsersPlatformAssignmentAPI: + """Test admin platform assignment endpoints.""" + + def test_assign_admin_to_platform( + self, client, super_admin_headers, test_platform_admin, test_platform + ): + """Test assigning an admin to a platform.""" + response = client.post( + f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms/{test_platform.id}", + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert "message" in data + assert data["user_id"] == test_platform_admin.id + assert data["platform_id"] == test_platform.id + + def test_remove_admin_from_platform( + self, client, super_admin_headers, db, test_platform_admin, test_platform, test_super_admin + ): + """Test removing an admin from a platform.""" + from models.database.admin_platform import AdminPlatform + + # First create an assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + response = client.delete( + f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms/{test_platform.id}", + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert "message" in data + + def test_get_admin_platforms( + self, client, super_admin_headers, db, test_platform_admin, test_platform, test_super_admin + ): + """Test getting platforms for an admin.""" + from models.database.admin_platform import AdminPlatform + + # Create assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + response = client.get( + f"/api/v1/admin/admin-users/{test_platform_admin.id}/platforms", + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert "platforms" in data + assert len(data["platforms"]) == 1 + assert data["platforms"][0]["id"] == test_platform.id + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminUsersSuperAdminToggleAPI: + """Test super admin promotion/demotion.""" + + def test_promote_to_super_admin( + self, client, super_admin_headers, test_platform_admin + ): + """Test promoting a platform admin to super admin.""" + response = client.put( + f"/api/v1/admin/admin-users/{test_platform_admin.id}/super-admin", + json={"is_super_admin": True}, + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert data["is_super_admin"] is True + + def test_demote_from_super_admin( + self, client, super_admin_headers, db, auth_manager + ): + """Test demoting a super admin to platform admin.""" + from models.database.user import User + + # Create another super admin to demote + another_super = User( + email="demote_test@example.com", + username="demote_test", + hashed_password=auth_manager.hash_password("pass"), + role="admin", + is_active=True, + is_super_admin=True, + ) + db.add(another_super) + db.commit() + + response = client.put( + f"/api/v1/admin/admin-users/{another_super.id}/super-admin", + json={"is_super_admin": False}, + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert data["is_super_admin"] is False + + +@pytest.mark.integration +@pytest.mark.api +@pytest.mark.admin +class TestAdminAuthPlatformSelectionAPI: + """Test platform selection for platform admins.""" + + def test_get_accessible_platforms_super_admin( + self, client, super_admin_headers, test_platform + ): + """Test getting accessible platforms as super admin.""" + response = client.get( + "/api/v1/admin/auth/accessible-platforms", + headers=super_admin_headers, + ) + + assert response.status_code == 200 + data = response.json() + assert data["is_super_admin"] is True + assert "platforms" in data + assert data["requires_platform_selection"] is False + + def test_get_accessible_platforms_platform_admin( + self, client, db, test_platform_admin, test_platform, test_super_admin, auth_manager + ): + """Test getting accessible platforms as platform admin.""" + from models.database.admin_platform import AdminPlatform + + # Create assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + # Login as platform admin + response = client.post( + "/api/v1/admin/auth/login", + json={ + "email_or_username": test_platform_admin.username, + "password": "platformadminpass123", + }, + ) + assert response.status_code == 200 + token = response.json()["access_token"] + + # Get accessible platforms + response = client.get( + "/api/v1/admin/auth/accessible-platforms", + headers={"Authorization": f"Bearer {token}"}, + ) + + assert response.status_code == 200 + data = response.json() + assert data["is_super_admin"] is False + assert len(data["platforms"]) == 1 + assert data["platforms"][0]["id"] == test_platform.id + assert data["requires_platform_selection"] is True + + def test_select_platform_success( + self, client, db, test_platform_admin, test_platform, test_super_admin + ): + """Test selecting a platform as platform admin.""" + from models.database.admin_platform import AdminPlatform + + # Create assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + # Login + response = client.post( + "/api/v1/admin/auth/login", + json={ + "email_or_username": test_platform_admin.username, + "password": "platformadminpass123", + }, + ) + token = response.json()["access_token"] + + # Select platform + response = client.post( + "/api/v1/admin/auth/select-platform", + params={"platform_id": test_platform.id}, + headers={"Authorization": f"Bearer {token}"}, + ) + + assert response.status_code == 200 + data = response.json() + assert "access_token" in data + # New token should be different + assert data["access_token"] != token + + def test_select_platform_not_assigned( + self, client, db, test_platform_admin, test_platform + ): + """Test selecting a platform that admin is not assigned to.""" + # Login (no platform assignment exists) + response = client.post( + "/api/v1/admin/auth/login", + json={ + "email_or_username": test_platform_admin.username, + "password": "platformadminpass123", + }, + ) + token = response.json()["access_token"] + + # Try to select platform + response = client.post( + "/api/v1/admin/auth/select-platform", + params={"platform_id": test_platform.id}, + headers={"Authorization": f"Bearer {token}"}, + ) + + assert response.status_code == 403 # Access denied + + def test_select_platform_as_super_admin_fails( + self, client, super_admin_headers, test_platform + ): + """Test that super admin cannot select platform (they don't need to).""" + response = client.post( + "/api/v1/admin/auth/select-platform", + params={"platform_id": test_platform.id}, + headers=super_admin_headers, + ) + + assert response.status_code == 401 # Super admins don't need this diff --git a/tests/unit/models/database/test_admin_platform.py b/tests/unit/models/database/test_admin_platform.py new file mode 100644 index 00000000..77bcb27f --- /dev/null +++ b/tests/unit/models/database/test_admin_platform.py @@ -0,0 +1,271 @@ +# tests/unit/models/database/test_admin_platform.py +""" +Unit tests for AdminPlatform model. + +Tests the admin-platform junction table model and its relationships. +""" + +import pytest +from sqlalchemy.exc import IntegrityError + +from models.database.admin_platform import AdminPlatform + + +@pytest.mark.unit +@pytest.mark.database +@pytest.mark.admin +class TestAdminPlatformModel: + """Test AdminPlatform model creation and constraints.""" + + def test_create_admin_platform_assignment( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test creating an admin platform assignment.""" + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(assignment) + + assert assignment.id is not None + assert assignment.user_id == test_platform_admin.id + assert assignment.platform_id == test_platform.id + assert assignment.is_active is True + assert assignment.assigned_by_user_id == test_super_admin.id + assert assignment.assigned_at is not None + + def test_admin_platform_unique_constraint( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test that an admin can only be assigned to a platform once.""" + # Create first assignment + assignment1 = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment1) + db.commit() + + # Try to create duplicate assignment + assignment2 = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment2) + + with pytest.raises(IntegrityError): + db.commit() + + def test_admin_platform_cascade_delete_user( + self, db, auth_manager, test_platform, test_super_admin + ): + """Test that deleting user cascades to admin platform assignments.""" + from models.database.user import User + + # Create a temporary admin + temp_admin = User( + email="temp_admin@example.com", + username="temp_admin", + hashed_password=auth_manager.hash_password("temppass"), + role="admin", + is_active=True, + is_super_admin=False, + ) + db.add(temp_admin) + db.flush() + + # Create assignment + assignment = AdminPlatform( + user_id=temp_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + assignment_id = assignment.id + + # Delete user - should cascade to assignment + db.delete(temp_admin) + db.commit() + + # Verify assignment is gone + remaining = db.query(AdminPlatform).filter(AdminPlatform.id == assignment_id).first() + assert remaining is None + + def test_admin_platform_relationships( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test AdminPlatform relationships are loaded correctly.""" + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(assignment) + + # Test relationships + assert assignment.user is not None + assert assignment.user.id == test_platform_admin.id + assert assignment.platform is not None + assert assignment.platform.id == test_platform.id + assert assignment.assigned_by is not None + assert assignment.assigned_by.id == test_super_admin.id + + def test_admin_platform_properties( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test AdminPlatform computed properties.""" + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(assignment) + + # Test properties + assert assignment.platform_code == test_platform.code + assert assignment.platform_name == test_platform.name + + def test_admin_platform_repr( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test AdminPlatform string representation.""" + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(assignment) + + repr_str = repr(assignment) + assert "AdminPlatform" in repr_str + assert str(test_platform_admin.id) in repr_str + assert str(test_platform.id) in repr_str + + +@pytest.mark.unit +@pytest.mark.database +@pytest.mark.admin +class TestUserAdminMethods: + """Test User model admin-related methods.""" + + def test_is_super_admin_user_true(self, db, test_super_admin): + """Test is_super_admin_user property for super admin.""" + assert test_super_admin.is_super_admin_user is True + + def test_is_super_admin_user_false_for_platform_admin(self, db, test_platform_admin): + """Test is_super_admin_user property for platform admin.""" + assert test_platform_admin.is_super_admin_user is False + + def test_is_platform_admin_true(self, db, test_platform_admin): + """Test is_platform_admin property for platform admin.""" + assert test_platform_admin.is_platform_admin is True + + def test_is_platform_admin_false_for_super_admin(self, db, test_super_admin): + """Test is_platform_admin property for super admin.""" + assert test_super_admin.is_platform_admin is False + + def test_can_access_platform_super_admin(self, db, test_super_admin, test_platform): + """Test that super admin can access any platform.""" + assert test_super_admin.can_access_platform(test_platform.id) is True + + def test_can_access_platform_assigned( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test that platform admin can access assigned platform.""" + # Create assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(test_platform_admin) + + assert test_platform_admin.can_access_platform(test_platform.id) is True + + def test_can_access_platform_not_assigned( + self, db, test_platform_admin, test_platform + ): + """Test that platform admin cannot access unassigned platform.""" + # No assignment created + assert test_platform_admin.can_access_platform(test_platform.id) is False + + def test_can_access_platform_inactive_assignment( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test that platform admin cannot access platform with inactive assignment.""" + # Create inactive assignment + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=False, # Inactive + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + db.refresh(test_platform_admin) + + assert test_platform_admin.can_access_platform(test_platform.id) is False + + def test_get_accessible_platform_ids_super_admin(self, db, test_super_admin): + """Test get_accessible_platform_ids returns None for super admin.""" + result = test_super_admin.get_accessible_platform_ids() + assert result is None # None means all platforms + + def test_get_accessible_platform_ids_platform_admin( + self, db, test_platform_admin, test_platform, another_platform, test_super_admin + ): + """Test get_accessible_platform_ids returns correct list for platform admin.""" + # Create assignments for both platforms + assignment1 = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + assignment2 = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=another_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add_all([assignment1, assignment2]) + db.commit() + db.refresh(test_platform_admin) + + result = test_platform_admin.get_accessible_platform_ids() + assert len(result) == 2 + assert test_platform.id in result + assert another_platform.id in result + + def test_get_accessible_platform_ids_no_assignments(self, db, test_platform_admin): + """Test get_accessible_platform_ids returns empty list when no assignments.""" + result = test_platform_admin.get_accessible_platform_ids() + assert result == [] + + def test_get_accessible_platform_ids_vendor_user(self, db, test_vendor_user): + """Test get_accessible_platform_ids returns empty list for non-admin.""" + result = test_vendor_user.get_accessible_platform_ids() + assert result == [] diff --git a/tests/unit/services/test_admin_platform_service.py b/tests/unit/services/test_admin_platform_service.py new file mode 100644 index 00000000..63fe473b --- /dev/null +++ b/tests/unit/services/test_admin_platform_service.py @@ -0,0 +1,463 @@ +# tests/unit/services/test_admin_platform_service.py +""" +Unit tests for AdminPlatformService. + +Tests the admin platform assignment service operations. +""" + +import pytest + +from app.exceptions import AdminOperationException, CannotModifySelfException, ValidationException +from app.services.admin_platform_service import AdminPlatformService + + +@pytest.mark.unit +@pytest.mark.admin +class TestAdminPlatformServiceAssign: + """Test AdminPlatformService.assign_admin_to_platform.""" + + def test_assign_admin_to_platform_success( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test successfully assigning an admin to a platform.""" + service = AdminPlatformService() + + assignment = service.assign_admin_to_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + + assert assignment is not None + assert assignment.user_id == test_platform_admin.id + assert assignment.platform_id == test_platform.id + assert assignment.is_active is True + assert assignment.assigned_by_user_id == test_super_admin.id + + def test_assign_admin_user_not_found(self, db, test_platform, test_super_admin): + """Test assigning non-existent user raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.assign_admin_to_platform( + db=db, + admin_user_id=99999, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + assert "User not found" in str(exc.value) + + def test_assign_admin_not_admin_role( + self, db, test_vendor_user, test_platform, test_super_admin + ): + """Test assigning non-admin user raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.assign_admin_to_platform( + db=db, + admin_user_id=test_vendor_user.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + assert "must be an admin" in str(exc.value) + + def test_assign_super_admin_raises_error( + self, db, test_super_admin, test_platform + ): + """Test assigning super admin raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.assign_admin_to_platform( + db=db, + admin_user_id=test_super_admin.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + assert "Super admins don't need platform assignments" in str(exc.value) + + def test_assign_platform_not_found( + self, db, test_platform_admin, test_super_admin + ): + """Test assigning to non-existent platform raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.assign_admin_to_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=99999, + assigned_by_user_id=test_super_admin.id, + ) + assert "Platform not found" in str(exc.value) + + def test_assign_admin_already_assigned( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test assigning already assigned admin raises error.""" + service = AdminPlatformService() + + # First assignment + service.assign_admin_to_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + db.commit() + + # Try to assign again + with pytest.raises(AdminOperationException) as exc: + service.assign_admin_to_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + assert "already assigned" in str(exc.value) + + def test_reactivate_inactive_assignment( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test reactivating an inactive assignment.""" + from models.database.admin_platform import AdminPlatform + + service = AdminPlatformService() + + # Create inactive assignment directly + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=False, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + # Assign again - should reactivate + result = service.assign_admin_to_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + assigned_by_user_id=test_super_admin.id, + ) + + assert result.is_active is True + + +@pytest.mark.unit +@pytest.mark.admin +class TestAdminPlatformServiceRemove: + """Test AdminPlatformService.remove_admin_from_platform.""" + + def test_remove_admin_from_platform_success( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test successfully removing an admin from a platform.""" + from models.database.admin_platform import AdminPlatform + + service = AdminPlatformService() + + # Create assignment first + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + # Remove + service.remove_admin_from_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + removed_by_user_id=test_super_admin.id, + ) + db.commit() + db.refresh(assignment) + + assert assignment.is_active is False + + def test_remove_admin_not_assigned( + self, db, test_platform_admin, test_platform, test_super_admin + ): + """Test removing non-existent assignment raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.remove_admin_from_platform( + db=db, + admin_user_id=test_platform_admin.id, + platform_id=test_platform.id, + removed_by_user_id=test_super_admin.id, + ) + assert "not assigned" in str(exc.value) + + +@pytest.mark.unit +@pytest.mark.admin +class TestAdminPlatformServiceQueries: + """Test AdminPlatformService query methods.""" + + def test_get_platforms_for_admin( + self, db, test_platform_admin, test_platform, another_platform, test_super_admin + ): + """Test getting platforms for an admin.""" + from models.database.admin_platform import AdminPlatform + + service = AdminPlatformService() + + # Create assignments + for platform in [test_platform, another_platform]: + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + platforms = service.get_platforms_for_admin(db, test_platform_admin.id) + + assert len(platforms) == 2 + platform_ids = [p.id for p in platforms] + assert test_platform.id in platform_ids + assert another_platform.id in platform_ids + + def test_get_platforms_for_admin_no_assignments(self, db, test_platform_admin): + """Test getting platforms when no assignments exist.""" + service = AdminPlatformService() + + platforms = service.get_platforms_for_admin(db, test_platform_admin.id) + + assert platforms == [] + + def test_get_admins_for_platform( + self, db, test_platform_admin, test_platform, test_super_admin, auth_manager + ): + """Test getting admins for a platform.""" + from models.database.admin_platform import AdminPlatform + from models.database.user import User + + service = AdminPlatformService() + + # Create another platform admin + another_admin = User( + email="another_padmin@example.com", + username="another_padmin", + hashed_password=auth_manager.hash_password("pass"), + role="admin", + is_active=True, + is_super_admin=False, + ) + db.add(another_admin) + db.flush() + + # Create assignments for both admins + for admin in [test_platform_admin, another_admin]: + assignment = AdminPlatform( + user_id=admin.id, + platform_id=test_platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + admins = service.get_admins_for_platform(db, test_platform.id) + + assert len(admins) == 2 + admin_ids = [a.id for a in admins] + assert test_platform_admin.id in admin_ids + assert another_admin.id in admin_ids + + def test_get_admin_assignments( + self, db, test_platform_admin, test_platform, another_platform, test_super_admin + ): + """Test getting admin assignments with platform details.""" + from models.database.admin_platform import AdminPlatform + + service = AdminPlatformService() + + # Create assignments + for platform in [test_platform, another_platform]: + assignment = AdminPlatform( + user_id=test_platform_admin.id, + platform_id=platform.id, + is_active=True, + assigned_by_user_id=test_super_admin.id, + ) + db.add(assignment) + db.commit() + + assignments = service.get_admin_assignments(db, test_platform_admin.id) + + assert len(assignments) == 2 + # Verify platform relationship is loaded + for assignment in assignments: + assert assignment.platform is not None + assert assignment.platform.code is not None + + +@pytest.mark.unit +@pytest.mark.admin +class TestAdminPlatformServiceSuperAdmin: + """Test AdminPlatformService super admin operations.""" + + def test_toggle_super_admin_promote( + self, db, test_platform_admin, test_super_admin + ): + """Test promoting admin to super admin.""" + service = AdminPlatformService() + + result = service.toggle_super_admin( + db=db, + user_id=test_platform_admin.id, + is_super_admin=True, + current_admin_id=test_super_admin.id, + ) + db.commit() + + assert result.is_super_admin is True + + def test_toggle_super_admin_demote( + self, db, test_super_admin, auth_manager + ): + """Test demoting super admin to platform admin.""" + from models.database.user import User + + service = AdminPlatformService() + + # Create another super admin to demote + another_super = User( + email="another_super@example.com", + username="another_super", + hashed_password=auth_manager.hash_password("pass"), + role="admin", + is_active=True, + is_super_admin=True, + ) + db.add(another_super) + db.commit() + + result = service.toggle_super_admin( + db=db, + user_id=another_super.id, + is_super_admin=False, + current_admin_id=test_super_admin.id, + ) + db.commit() + + assert result.is_super_admin is False + + def test_toggle_super_admin_cannot_demote_self(self, db, test_super_admin): + """Test that super admin cannot demote themselves.""" + service = AdminPlatformService() + + with pytest.raises(CannotModifySelfException): + service.toggle_super_admin( + db=db, + user_id=test_super_admin.id, + is_super_admin=False, + current_admin_id=test_super_admin.id, + ) + + def test_toggle_super_admin_user_not_found(self, db, test_super_admin): + """Test toggling non-existent user raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.toggle_super_admin( + db=db, + user_id=99999, + is_super_admin=True, + current_admin_id=test_super_admin.id, + ) + assert "User not found" in str(exc.value) + + def test_toggle_super_admin_not_admin( + self, db, test_vendor_user, test_super_admin + ): + """Test toggling non-admin user raises error.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.toggle_super_admin( + db=db, + user_id=test_vendor_user.id, + is_super_admin=True, + current_admin_id=test_super_admin.id, + ) + assert "must be an admin" in str(exc.value) + + +@pytest.mark.unit +@pytest.mark.admin +class TestAdminPlatformServiceCreatePlatformAdmin: + """Test AdminPlatformService.create_platform_admin.""" + + def test_create_platform_admin_success( + self, db, test_platform, another_platform, test_super_admin + ): + """Test creating a new platform admin with assignments.""" + service = AdminPlatformService() + + user, assignments = service.create_platform_admin( + db=db, + email="new_padmin@example.com", + username="new_padmin", + password="securepass123", + platform_ids=[test_platform.id, another_platform.id], + created_by_user_id=test_super_admin.id, + first_name="New", + last_name="Admin", + ) + db.commit() + + assert user is not None + assert user.email == "new_padmin@example.com" + assert user.username == "new_padmin" + assert user.role == "admin" + assert user.is_super_admin is False + assert user.first_name == "New" + assert user.last_name == "Admin" + assert len(assignments) == 2 + + def test_create_platform_admin_duplicate_email( + self, db, test_platform, test_super_admin, test_platform_admin + ): + """Test creating platform admin with duplicate email fails.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.create_platform_admin( + db=db, + email=test_platform_admin.email, # Duplicate + username="unique_username", + password="securepass123", + platform_ids=[test_platform.id], + created_by_user_id=test_super_admin.id, + ) + assert "Email already exists" in str(exc.value) + + def test_create_platform_admin_duplicate_username( + self, db, test_platform, test_super_admin, test_platform_admin + ): + """Test creating platform admin with duplicate username fails.""" + service = AdminPlatformService() + + with pytest.raises(ValidationException) as exc: + service.create_platform_admin( + db=db, + email="unique@example.com", + username=test_platform_admin.username, # Duplicate + password="securepass123", + platform_ids=[test_platform.id], + created_by_user_id=test_super_admin.id, + ) + assert "Username already exists" in str(exc.value)