Issues fixed: - Platform selection returned LoginResponse requiring user timestamps, but UserContext doesn't have created_at/updated_at. Created dedicated PlatformSelectResponse that returns only token and platform info. - UserContext was missing platform context fields (token_platform_id, token_platform_code). JWT token included them but they weren't extracted into UserContext, causing fallback warnings. - admin_menu_config.py accessed admin_platforms (SQLAlchemy relationship) on UserContext (Pydantic schema). Changed to use accessible_platform_ids. - Static file mount order in main.py caused 404 for locale files. More specific paths (/static/modules/X/locales) must be mounted before less specific paths (/static/modules/X). Changes: - models/schema/auth.py: Add PlatformSelectResponse, token_platform_id, token_platform_code, can_access_platform(), get_accessible_platform_ids() - admin_auth.py: Use PlatformSelectResponse for select-platform endpoint - admin_platform_service.py: Accept User | UserContext in validation - admin_menu_config.py: Use accessible_platform_ids instead of admin_platforms - main.py: Mount locales before static for correct path priority Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
312 lines
9.1 KiB
Python
312 lines
9.1 KiB
Python
# auth.py - Keep security-critical validation
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
|
|
|
|
|
|
class UserLogin(BaseModel):
|
|
email_or_username: str = Field(..., description="Username or email address")
|
|
password: str = Field(..., description="Password")
|
|
vendor_code: str | None = Field(
|
|
None, description="Optional vendor code for context"
|
|
)
|
|
|
|
@field_validator("email_or_username")
|
|
@classmethod
|
|
def validate_email_or_username(cls, v):
|
|
return v.strip()
|
|
|
|
|
|
class UserResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str
|
|
is_active: bool
|
|
is_super_admin: bool = False
|
|
preferred_language: str | None = None
|
|
last_login: datetime | None = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
|
|
class LoginResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
expires_in: int
|
|
user: UserResponse
|
|
|
|
|
|
class PlatformSelectResponse(BaseModel):
|
|
"""Response for platform selection (no user data - client already has it)."""
|
|
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
expires_in: int
|
|
platform_id: int
|
|
platform_code: str
|
|
|
|
|
|
class UserDetailResponse(UserResponse):
|
|
"""Extended user response with additional details."""
|
|
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
full_name: str | None = None
|
|
is_email_verified: bool = False
|
|
owned_companies_count: int = 0
|
|
vendor_memberships_count: int = 0
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
"""Schema for updating user information."""
|
|
|
|
username: str | None = Field(None, min_length=3, max_length=50)
|
|
email: EmailStr | None = None
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str | None = Field(None, pattern="^(admin|vendor)$")
|
|
is_active: bool | None = None
|
|
is_email_verified: bool | None = None
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if v and not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip() if v else v
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
"""Schema for creating a new user (admin only)."""
|
|
|
|
email: EmailStr = Field(..., description="Valid email address")
|
|
username: str = Field(..., min_length=3, max_length=50)
|
|
password: str = Field(..., min_length=6, description="Password")
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str = Field(default="vendor", pattern="^(admin|vendor)$")
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip()
|
|
|
|
|
|
class UserListResponse(BaseModel):
|
|
"""Schema for paginated user list."""
|
|
|
|
items: list[UserResponse]
|
|
total: int
|
|
page: int
|
|
per_page: int
|
|
pages: int
|
|
|
|
|
|
class UserSearchItem(BaseModel):
|
|
"""Schema for a single user search result."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
is_active: bool
|
|
|
|
|
|
class UserSearchResponse(BaseModel):
|
|
"""Schema for user search results."""
|
|
|
|
users: list[UserSearchItem]
|
|
|
|
|
|
class UserStatusToggleResponse(BaseModel):
|
|
"""Schema for user status toggle response."""
|
|
|
|
message: str
|
|
is_active: bool
|
|
|
|
|
|
class UserDeleteResponse(BaseModel):
|
|
"""Schema for user delete response."""
|
|
|
|
message: str
|
|
|
|
|
|
class LogoutResponse(BaseModel):
|
|
"""Schema for logout response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetRequestResponse(BaseModel):
|
|
"""Schema for password reset request response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetResponse(BaseModel):
|
|
"""Schema for password reset response."""
|
|
|
|
message: str
|
|
|
|
|
|
class VendorUserResponse(BaseModel):
|
|
"""Schema for vendor user info in auth context."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
role: str
|
|
is_active: bool
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class UserContext(BaseModel):
|
|
"""
|
|
User context for dependency injection in API endpoints.
|
|
|
|
This schema replaces direct use of the User database model in API routes,
|
|
following the principle that routes should not import database models directly.
|
|
|
|
Used by:
|
|
- get_current_admin_api / get_current_admin_from_cookie_or_header
|
|
- get_current_vendor_api / get_current_vendor_from_cookie_or_header
|
|
- get_current_super_admin
|
|
|
|
For admin users:
|
|
- is_super_admin indicates full platform access
|
|
- accessible_platform_ids is None for super admins (all platforms)
|
|
- accessible_platform_ids is a list for platform admins
|
|
|
|
For vendor users:
|
|
- token_vendor_id/code/role come from JWT token
|
|
- These indicate which vendor context the user is operating in
|
|
"""
|
|
|
|
# Core user fields
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str # "admin" or "vendor"
|
|
is_active: bool = True
|
|
|
|
# Admin-specific fields
|
|
is_super_admin: bool = False
|
|
accessible_platform_ids: list[int] | None = None # None = all platforms (super admin)
|
|
|
|
# Admin platform context (from JWT token after platform selection)
|
|
token_platform_id: int | None = None
|
|
token_platform_code: str | None = None
|
|
|
|
# Vendor-specific fields (from JWT token)
|
|
token_vendor_id: int | None = None
|
|
token_vendor_code: str | None = None
|
|
token_vendor_role: str | None = None
|
|
|
|
# Optional profile fields
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
preferred_language: str | None = None
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
"""Returns the full name of the user."""
|
|
if self.first_name and self.last_name:
|
|
return f"{self.first_name} {self.last_name}"
|
|
return self.username
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is a platform admin."""
|
|
return self.role == "admin"
|
|
|
|
@property
|
|
def is_vendor(self) -> bool:
|
|
"""Check if user is a vendor."""
|
|
return self.role == "vendor"
|
|
|
|
def can_access_platform(self, platform_id: int) -> bool:
|
|
"""
|
|
Check if user can access a specific platform.
|
|
|
|
Super admins (accessible_platform_ids=None) can access all platforms.
|
|
Platform admins can only access their assigned platforms.
|
|
"""
|
|
if self.is_super_admin:
|
|
return True
|
|
if self.accessible_platform_ids is None:
|
|
return True # Super admin fallback
|
|
return platform_id in self.accessible_platform_ids
|
|
|
|
def get_accessible_platform_ids(self) -> list[int] | None:
|
|
"""
|
|
Get list of platform IDs this user can access.
|
|
|
|
Returns None for super admins (all platforms accessible).
|
|
Returns list of platform IDs for platform admins.
|
|
"""
|
|
return self.accessible_platform_ids
|
|
|
|
@classmethod
|
|
def from_user(cls, user, include_vendor_context: bool = True) -> "UserContext":
|
|
"""
|
|
Create UserContext from a User database model.
|
|
|
|
Args:
|
|
user: User database model instance
|
|
include_vendor_context: Whether to include token_vendor_* fields
|
|
|
|
Returns:
|
|
UserContext instance
|
|
"""
|
|
data = {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"is_super_admin": getattr(user, "is_super_admin", False),
|
|
"first_name": getattr(user, "first_name", None),
|
|
"last_name": getattr(user, "last_name", None),
|
|
"preferred_language": getattr(user, "preferred_language", None),
|
|
}
|
|
|
|
# Add admin platform access info
|
|
if user.role == "admin":
|
|
if getattr(user, "is_super_admin", False):
|
|
data["accessible_platform_ids"] = None # All platforms
|
|
else:
|
|
# Get platform IDs from admin_platforms relationship
|
|
admin_platforms = getattr(user, "admin_platforms", [])
|
|
data["accessible_platform_ids"] = [
|
|
ap.platform_id for ap in admin_platforms if ap.is_active
|
|
]
|
|
|
|
# Add platform context from JWT token (for platform admins after selection)
|
|
data["token_platform_id"] = getattr(user, "token_platform_id", None)
|
|
data["token_platform_code"] = getattr(user, "token_platform_code", None)
|
|
|
|
# Add vendor context from JWT token if present
|
|
if include_vendor_context:
|
|
data["token_vendor_id"] = getattr(user, "token_vendor_id", None)
|
|
data["token_vendor_code"] = getattr(user, "token_vendor_code", None)
|
|
data["token_vendor_role"] = getattr(user, "token_vendor_role", None)
|
|
|
|
return cls(**data)
|