feat(users): implement full user management CRUD

API endpoints (app/api/v1/admin/users.py):
- GET /users: Paginated list with search and filters
- POST /users: Create new user
- GET /users/{id}: Get user details with related counts
- PUT /users/{id}: Update user information
- PUT /users/{id}/status: Toggle active status
- DELETE /users/{id}: Delete user (with ownership check)

Pydantic schemas (models/schema/auth.py):
- UserCreate: For creating new users
- UserUpdate: For updating user information
- UserDetailResponse: Extended user details with counts
- UserListResponse: Paginated list response

Frontend:
- Updated users.html with server-side pagination and filters
- New user-create.html/js for user creation form
- New user-detail.html/js for viewing user details
- New user-edit.html/js for editing users

Routes added for user create, detail, and edit pages.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-03 21:37:23 +01:00
parent 901471bb76
commit be9c892739
11 changed files with 1987 additions and 274 deletions

View File

@@ -4,42 +4,123 @@ User management endpoints for admin.
"""
import logging
import math
from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query
from sqlalchemy.orm import Session, joinedload
from app.api.deps import get_current_admin_api
from app.core.database import get_db
from app.services.admin_service import admin_service
from middleware.auth import AuthManager
from app.services.stats_service import stats_service
from models.database.user import User
from models.schema.auth import UserResponse
from models.schema.auth import (
UserCreate,
UserDetailResponse,
UserListResponse,
UserResponse,
UserUpdate,
)
router = APIRouter(prefix="/users")
logger = logging.getLogger(__name__)
@router.get("", response_model=list[UserResponse])
@router.get("", response_model=UserListResponse)
def get_all_users(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
page: int = Query(1, ge=1),
per_page: int = Query(10, ge=1, le=100),
search: str = Query("", description="Search by username or email"),
role: str = Query("", description="Filter by role"),
is_active: str = Query("", description="Filter by active status"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get all users (Admin only)."""
users = admin_service.get_all_users(db=db, skip=skip, limit=limit)
return [UserResponse.model_validate(user) for user in users]
"""Get paginated list of all users (Admin only)."""
query = db.query(User)
# Apply filters
if search:
search_term = f"%{search.lower()}%"
query = query.filter(
(User.username.ilike(search_term))
| (User.email.ilike(search_term))
| (User.first_name.ilike(search_term))
| (User.last_name.ilike(search_term))
)
if role:
query = query.filter(User.role == role)
if is_active:
query = query.filter(User.is_active == (is_active.lower() == "true"))
# Get total count
total = query.count()
pages = math.ceil(total / per_page) if total > 0 else 1
# Apply pagination
skip = (page - 1) * per_page
users = query.order_by(User.created_at.desc()).offset(skip).limit(per_page).all()
return UserListResponse(
items=[UserResponse.model_validate(user) for user in users],
total=total,
page=page,
per_page=per_page,
pages=pages,
)
@router.put("/{user_id}/status")
def toggle_user_status(
user_id: int,
@router.post("", response_model=UserDetailResponse)
def create_user(
user_data: UserCreate = Body(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Toggle user active status (Admin only)."""
user, message = admin_service.toggle_user_status(db, user_id, current_admin.id)
return {"message": message}
"""Create a new user (Admin only)."""
# Check if email exists
if db.query(User).filter(User.email == user_data.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
# Check if username exists
if db.query(User).filter(User.username == user_data.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
# Create user
auth_manager = AuthManager()
user = User(
email=user_data.email,
username=user_data.username,
hashed_password=auth_manager.hash_password(user_data.password),
first_name=user_data.first_name,
last_name=user_data.last_name,
role=user_data.role,
is_active=True,
)
db.add(user)
db.commit()
db.refresh(user)
logger.info(f"Admin {current_admin.username} created user {user.username}")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.get("/stats")
@@ -66,9 +147,7 @@ def search_users(
search_term = f"%{q.lower()}%"
users = (
db.query(User)
.filter(
(User.username.ilike(search_term)) | (User.email.ilike(search_term))
)
.filter((User.username.ilike(search_term)) | (User.email.ilike(search_term)))
.limit(limit)
.all()
)
@@ -84,3 +163,157 @@ def search_users(
for user in users
]
}
@router.get("/{user_id}", response_model=UserDetailResponse)
def get_user_details(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Get detailed user information (Admin only)."""
user = (
db.query(User)
.options(joinedload(User.owned_companies), joinedload(User.vendor_memberships))
.filter(User.id == user_id)
.first()
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.put("/{user_id}", response_model=UserDetailResponse)
def update_user(
user_id: int = Path(..., description="User ID"),
user_update: UserUpdate = Body(...),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Update user information (Admin only)."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent changing own admin status
if user.id == current_admin.id and user_update.role and user_update.role != "admin":
raise HTTPException(
status_code=400, detail="Cannot change your own admin role"
)
# Check email uniqueness if changing
if user_update.email and user_update.email != user.email:
if db.query(User).filter(User.email == user_update.email).first():
raise HTTPException(status_code=400, detail="Email already registered")
# Check username uniqueness if changing
if user_update.username and user_update.username != user.username:
if db.query(User).filter(User.username == user_update.username).first():
raise HTTPException(status_code=400, detail="Username already taken")
# Update fields
update_data = user_update.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(user, field, value)
db.commit()
db.refresh(user)
logger.info(f"Admin {current_admin.username} updated user {user.username}")
return UserDetailResponse(
id=user.id,
email=user.email,
username=user.username,
role=user.role,
is_active=user.is_active,
last_login=user.last_login,
created_at=user.created_at,
updated_at=user.updated_at,
first_name=user.first_name,
last_name=user.last_name,
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies),
vendor_memberships_count=len(user.vendor_memberships),
)
@router.put("/{user_id}/status")
def toggle_user_status(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Toggle user active status (Admin only)."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent deactivating yourself
if user.id == current_admin.id:
raise HTTPException(status_code=400, detail="Cannot deactivate yourself")
user.is_active = not user.is_active
db.commit()
action = "activated" if user.is_active else "deactivated"
logger.info(f"Admin {current_admin.username} {action} user {user.username}")
return {"message": f"User {action} successfully", "is_active": user.is_active}
@router.delete("/{user_id}")
def delete_user(
user_id: int = Path(..., description="User ID"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
"""Delete a user (Admin only)."""
user = (
db.query(User)
.options(joinedload(User.owned_companies))
.filter(User.id == user_id)
.first()
)
if not user:
raise HTTPException(status_code=404, detail="User not found")
# Prevent deleting yourself
if user.id == current_admin.id:
raise HTTPException(status_code=400, detail="Cannot delete yourself")
# Prevent deleting users who own companies
if user.owned_companies:
raise HTTPException(
status_code=400,
detail=f"Cannot delete user who owns {len(user.owned_companies)} company(ies). Transfer ownership first.",
)
username = user.username
db.delete(user)
db.commit()
logger.info(f"Admin {current_admin.username} deleted user {username}")
return {"message": "User deleted successfully"}