Files
orion/app/api/v1/admin/users.py
Samir Boulahtit be9c892739 feat(users): implement full user management CRUD
API endpoints (app/api/v1/admin/users.py):
- GET /users: Paginated list with search and filters
- POST /users: Create new user
- GET /users/{id}: Get user details with related counts
- PUT /users/{id}: Update user information
- PUT /users/{id}/status: Toggle active status
- DELETE /users/{id}: Delete user (with ownership check)

Pydantic schemas (models/schema/auth.py):
- UserCreate: For creating new users
- UserUpdate: For updating user information
- UserDetailResponse: Extended user details with counts
- UserListResponse: Paginated list response

Frontend:
- Updated users.html with server-side pagination and filters
- New user-create.html/js for user creation form
- New user-detail.html/js for viewing user details
- New user-edit.html/js for editing users

Routes added for user create, detail, and edit pages.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-12-03 21:37:23 +01:00

320 lines
9.8 KiB
Python

# app/api/v1/admin/users.py
"""
User management endpoints for admin.
"""
import logging
import math
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query
from sqlalchemy.orm import Session, joinedload
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.services.admin_service import admin_service
from middleware.auth import AuthManager
from app.services.stats_service import stats_service
from models.database.user import User
from models.schema.auth import (
UserCreate,
UserDetailResponse,
UserListResponse,
UserResponse,
UserUpdate,
)
router = APIRouter(prefix="/users")
logger = logging.getLogger(__name__)
@router.get("", response_model=UserListResponse)
def get_all_users(
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
search: str = Query("", description="Search by username or email"),
role: str = Query("", description="Filter by role"),
is_active: str = Query("", description="Filter by active status"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get paginated list of all users (Admin only)."""
query = db.query(User)
# Apply filters
if search:
search_term = f"%{search.lower()}%"
query = query.filter(
(User.username.ilike(search_term))
| (User.email.ilike(search_term))
| (User.first_name.ilike(search_term))
| (User.last_name.ilike(search_term))
)
if role:
query = query.filter(User.role == role)
if is_active:
query = query.filter(User.is_active == (is_active.lower() == "true"))
# Get total count
total = query.count()
pages = math.ceil(total / per_page) if total > 0 else 1
# Apply pagination
skip = (page - 1) * per_page
users = query.order_by(User.created_at.desc()).offset(skip).limit(per_page).all()
return UserListResponse(
items=[UserResponse.model_validate(user) for user in users],
total=total,
page=page,
per_page=per_page,
pages=pages,
)
@router.post("", response_model=UserDetailResponse)
def create_user(
user_data: UserCreate = Body(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Create a new user (Admin only)."""
# Check if email exists
if db.query(User).filter(User.email == user_data.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
# Check if username exists
if db.query(User).filter(User.username == user_data.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
# Create user
auth_manager = AuthManager()
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=auth_manager.hash_password(user_data.password),
first_name=user_data.first_name,
last_name=user_data.last_name,
role=user_data.role,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
logger.info(f"Admin {current_admin.username} created user {user.username}")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.get("/stats")
def get_user_statistics(
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get user statistics for admin dashboard (Admin only)."""
return stats_service.get_user_statistics(db)
@router.get("/search")
def search_users(
q: str = Query(..., min_length=2, description="Search query (username or email)"),
limit: int = Query(10, ge=1, le=50),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""
Search users by username or email (Admin only).
Used for autocomplete in ownership transfer.
"""
search_term = f"%{q.lower()}%"
users = (
db.query(User)
.filter((User.username.ilike(search_term)) | (User.email.ilike(search_term)))
.limit(limit)
.all()
)
return {
"users": [
{
"id": user.id,
"username": user.username,
"email": user.email,
"is_active": user.is_active,
}
for user in users
]
}
@router.get("/{user_id}", response_model=UserDetailResponse)
def get_user_details(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get detailed user information (Admin only)."""
user = (
db.query(User)
.options(joinedload(User.owned_companies), joinedload(User.vendor_memberships))
.filter(User.id == user_id)
.first()
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.put("/{user_id}", response_model=UserDetailResponse)
def update_user(
user_id: int = Path(..., description="User ID"),
user_update: UserUpdate = Body(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Update user information (Admin only)."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent changing own admin status
if user.id == current_admin.id and user_update.role and user_update.role != "admin":
raise HTTPException(
status_code=400, detail="Cannot change your own admin role"
)
# Check email uniqueness if changing
if user_update.email and user_update.email != user.email:
if db.query(User).filter(User.email == user_update.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
# Check username uniqueness if changing
if user_update.username and user_update.username != user.username:
if db.query(User).filter(User.username == user_update.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
# Update fields
update_data = user_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
logger.info(f"Admin {current_admin.username} updated user {user.username}")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.put("/{user_id}/status")
def toggle_user_status(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Toggle user active status (Admin only)."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent deactivating yourself
if user.id == current_admin.id:
raise HTTPException(status_code=400, detail="Cannot deactivate yourself")
user.is_active = not user.is_active
db.commit()
action = "activated" if user.is_active else "deactivated"
logger.info(f"Admin {current_admin.username} {action} user {user.username}")
return {"message": f"User {action} successfully", "is_active": user.is_active}
@router.delete("/{user_id}")
def delete_user(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Delete a user (Admin only)."""
user = (
db.query(User)
.options(joinedload(User.owned_companies))
.filter(User.id == user_id)
.first()
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent deleting yourself
if user.id == current_admin.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
# Prevent deleting users who own companies
if user.owned_companies:
raise HTTPException(
status_code=400,
detail=f"Cannot delete user who owns {len(user.owned_companies)} company(ies). Transfer ownership first.",
)
username = user.username
db.delete(user)
db.commit()
logger.info(f"Admin {current_admin.username} deleted user {username}")
return {"message": "User deleted successfully"}