Replace direct User database model imports in API endpoints with UserContext schema, following the architecture principle that API routes should not import database models directly. Changes: - Create UserContext schema in models/schema/auth.py with from_user() factory - Update app/api/deps.py to return UserContext from all auth dependencies - Add _get_user_model() helper for functions needing User model access - Update 58 API endpoint files to use UserContext instead of User - Add noqa comments for 4 legitimate edge cases (enums, internal helpers) Architecture validation: 0 errors (down from 61), 11 warnings remain Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
272 lines
7.7 KiB
Python
272 lines
7.7 KiB
Python
# auth.py - Keep security-critical validation
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
|
|
|
|
|
|
class UserLogin(BaseModel):
|
|
email_or_username: str = Field(..., description="Username or email address")
|
|
password: str = Field(..., description="Password")
|
|
vendor_code: str | None = Field(
|
|
None, description="Optional vendor code for context"
|
|
)
|
|
|
|
@field_validator("email_or_username")
|
|
@classmethod
|
|
def validate_email_or_username(cls, v):
|
|
return v.strip()
|
|
|
|
|
|
class UserResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str
|
|
is_active: bool
|
|
is_super_admin: bool = False
|
|
preferred_language: str | None = None
|
|
last_login: datetime | None = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
|
|
class LoginResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
expires_in: int
|
|
user: UserResponse
|
|
|
|
|
|
class UserDetailResponse(UserResponse):
|
|
"""Extended user response with additional details."""
|
|
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
full_name: str | None = None
|
|
is_email_verified: bool = False
|
|
owned_companies_count: int = 0
|
|
vendor_memberships_count: int = 0
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
"""Schema for updating user information."""
|
|
|
|
username: str | None = Field(None, min_length=3, max_length=50)
|
|
email: EmailStr | None = None
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str | None = Field(None, pattern="^(admin|vendor)$")
|
|
is_active: bool | None = None
|
|
is_email_verified: bool | None = None
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if v and not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip() if v else v
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
"""Schema for creating a new user (admin only)."""
|
|
|
|
email: EmailStr = Field(..., description="Valid email address")
|
|
username: str = Field(..., min_length=3, max_length=50)
|
|
password: str = Field(..., min_length=6, description="Password")
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str = Field(default="vendor", pattern="^(admin|vendor)$")
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip()
|
|
|
|
|
|
class UserListResponse(BaseModel):
|
|
"""Schema for paginated user list."""
|
|
|
|
items: list[UserResponse]
|
|
total: int
|
|
page: int
|
|
per_page: int
|
|
pages: int
|
|
|
|
|
|
class UserSearchItem(BaseModel):
|
|
"""Schema for a single user search result."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
is_active: bool
|
|
|
|
|
|
class UserSearchResponse(BaseModel):
|
|
"""Schema for user search results."""
|
|
|
|
users: list[UserSearchItem]
|
|
|
|
|
|
class UserStatusToggleResponse(BaseModel):
|
|
"""Schema for user status toggle response."""
|
|
|
|
message: str
|
|
is_active: bool
|
|
|
|
|
|
class UserDeleteResponse(BaseModel):
|
|
"""Schema for user delete response."""
|
|
|
|
message: str
|
|
|
|
|
|
class LogoutResponse(BaseModel):
|
|
"""Schema for logout response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetRequestResponse(BaseModel):
|
|
"""Schema for password reset request response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetResponse(BaseModel):
|
|
"""Schema for password reset response."""
|
|
|
|
message: str
|
|
|
|
|
|
class VendorUserResponse(BaseModel):
|
|
"""Schema for vendor user info in auth context."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
role: str
|
|
is_active: bool
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class UserContext(BaseModel):
|
|
"""
|
|
User context for dependency injection in API endpoints.
|
|
|
|
This schema replaces direct use of the User database model in API routes,
|
|
following the principle that routes should not import database models directly.
|
|
|
|
Used by:
|
|
- get_current_admin_api / get_current_admin_from_cookie_or_header
|
|
- get_current_vendor_api / get_current_vendor_from_cookie_or_header
|
|
- get_current_super_admin
|
|
|
|
For admin users:
|
|
- is_super_admin indicates full platform access
|
|
- accessible_platform_ids is None for super admins (all platforms)
|
|
- accessible_platform_ids is a list for platform admins
|
|
|
|
For vendor users:
|
|
- token_vendor_id/code/role come from JWT token
|
|
- These indicate which vendor context the user is operating in
|
|
"""
|
|
|
|
# Core user fields
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str # "admin" or "vendor"
|
|
is_active: bool = True
|
|
|
|
# Admin-specific fields
|
|
is_super_admin: bool = False
|
|
accessible_platform_ids: list[int] | None = None # None = all platforms (super admin)
|
|
|
|
# Vendor-specific fields (from JWT token)
|
|
token_vendor_id: int | None = None
|
|
token_vendor_code: str | None = None
|
|
token_vendor_role: str | None = None
|
|
|
|
# Optional profile fields
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
preferred_language: str | None = None
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
"""Returns the full name of the user."""
|
|
if self.first_name and self.last_name:
|
|
return f"{self.first_name} {self.last_name}"
|
|
return self.username
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is a platform admin."""
|
|
return self.role == "admin"
|
|
|
|
@property
|
|
def is_vendor(self) -> bool:
|
|
"""Check if user is a vendor."""
|
|
return self.role == "vendor"
|
|
|
|
@classmethod
|
|
def from_user(cls, user, include_vendor_context: bool = True) -> "UserContext":
|
|
"""
|
|
Create UserContext from a User database model.
|
|
|
|
Args:
|
|
user: User database model instance
|
|
include_vendor_context: Whether to include token_vendor_* fields
|
|
|
|
Returns:
|
|
UserContext instance
|
|
"""
|
|
data = {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"is_super_admin": getattr(user, "is_super_admin", False),
|
|
"first_name": getattr(user, "first_name", None),
|
|
"last_name": getattr(user, "last_name", None),
|
|
"preferred_language": getattr(user, "preferred_language", None),
|
|
}
|
|
|
|
# Add admin platform access info
|
|
if user.role == "admin":
|
|
if getattr(user, "is_super_admin", False):
|
|
data["accessible_platform_ids"] = None # All platforms
|
|
else:
|
|
# Get platform IDs from admin_platforms relationship
|
|
admin_platforms = getattr(user, "admin_platforms", [])
|
|
data["accessible_platform_ids"] = [
|
|
ap.platform_id for ap in admin_platforms if ap.is_active
|
|
]
|
|
|
|
# Add vendor context from JWT token if present
|
|
if include_vendor_context:
|
|
data["token_vendor_id"] = getattr(user, "token_vendor_id", None)
|
|
data["token_vendor_code"] = getattr(user, "token_vendor_code", None)
|
|
data["token_vendor_role"] = getattr(user, "token_vendor_role", None)
|
|
|
|
return cls(**data)
|