Some checks failed
POST /api/v1/storefront/auth/forgot-password and .../reset-password were
both declared with bare `email: str` / `reset_token: str, new_password: str`
parameters. FastAPI treats unannotated str params as query parameters, so
the frontend's JSON body was ignored and the endpoint 422'd with
"missing query parameter 'email'". The docstrings on both endpoints
already said "Request Body" — intent was clear, implementation drifted.
Add two new Pydantic body schemas in tenancy/schemas/auth.py:
PasswordResetRequest { email: str } (forgot)
PasswordResetConfirm { reset_token: str, new_password: str } (reset)
Re-export from tenancy/schemas/__init__.py, import in
customers/routes/api/storefront.py, and switch both endpoint signatures
to take `body: <Schema>`. Internal usage reads body.email / body.reset_token
/ body.new_password.
Surfaced during Test 5 when user clicked "forgot password" on the customer
storefront login page to set a password for the first time after a
self-enrollment flow.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
370 lines
11 KiB
Python
370 lines
11 KiB
Python
# app/modules/tenancy/schemas/auth.py
|
|
"""
|
|
Authentication and user context schemas.
|
|
|
|
UserContext is the primary schema for dependency injection in API endpoints,
|
|
replacing direct use of the User database model in routes.
|
|
|
|
Migrated from models/schema/auth.py per MOD-019 / MOD-025.
|
|
"""
|
|
|
|
import re
|
|
from datetime import datetime
|
|
|
|
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
|
|
|
|
|
|
class UserLogin(BaseModel):
|
|
email_or_username: str = Field(..., description="Username or email address")
|
|
password: str = Field(..., description="Password")
|
|
store_code: str | None = Field(
|
|
None, description="Optional store code for context"
|
|
)
|
|
platform_code: str | None = Field(
|
|
None, description="Platform code from login context"
|
|
)
|
|
|
|
@field_validator("email_or_username")
|
|
@classmethod
|
|
def validate_email_or_username(cls, v):
|
|
return v.strip()
|
|
|
|
|
|
class UserResponse(BaseModel):
|
|
model_config = ConfigDict(from_attributes=True)
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str
|
|
is_active: bool
|
|
preferred_language: str | None = None
|
|
last_login: datetime | None = None
|
|
created_at: datetime
|
|
updated_at: datetime
|
|
|
|
|
|
class LoginResponse(BaseModel):
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
expires_in: int
|
|
user: UserResponse
|
|
|
|
|
|
class PlatformSelectResponse(BaseModel):
|
|
"""Response for platform selection (no user data - client already has it)."""
|
|
|
|
access_token: str
|
|
token_type: str = "bearer"
|
|
expires_in: int
|
|
platform_id: int
|
|
platform_code: str
|
|
|
|
|
|
class OwnedMerchantSummary(BaseModel):
|
|
"""Summary of a merchant owned by a user."""
|
|
|
|
id: int
|
|
name: str
|
|
is_active: bool
|
|
store_count: int
|
|
|
|
|
|
class StoreMembershipSummary(BaseModel):
|
|
"""Summary of a user's store membership."""
|
|
|
|
store_id: int
|
|
store_code: str
|
|
store_name: str
|
|
role: str
|
|
is_active: bool
|
|
|
|
|
|
class UserDetailResponse(UserResponse):
|
|
"""Extended user response with additional details."""
|
|
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
full_name: str | None = None
|
|
is_email_verified: bool = False
|
|
owned_merchants_count: int = 0
|
|
store_memberships_count: int = 0
|
|
owned_merchants: list[OwnedMerchantSummary] = []
|
|
store_memberships: list[StoreMembershipSummary] = []
|
|
|
|
|
|
class UserUpdate(BaseModel):
|
|
"""Schema for updating user information."""
|
|
|
|
username: str | None = Field(None, min_length=3, max_length=50)
|
|
email: EmailStr | None = None
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str | None = Field(None, pattern="^(super_admin|platform_admin|merchant_owner|store_member)$")
|
|
is_active: bool | None = None
|
|
is_email_verified: bool | None = None
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if v and not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip() if v else v
|
|
|
|
|
|
class UserCreate(BaseModel):
|
|
"""Schema for creating a new user (admin only)."""
|
|
|
|
email: EmailStr = Field(..., description="Valid email address")
|
|
username: str = Field(..., min_length=3, max_length=50)
|
|
password: str = Field(..., min_length=6, description="Password")
|
|
first_name: str | None = Field(None, max_length=100)
|
|
last_name: str | None = Field(None, max_length=100)
|
|
role: str = Field(default="store_member", pattern="^(super_admin|platform_admin|merchant_owner|store_member)$")
|
|
preferred_language: str | None = Field(
|
|
None, description="Preferred language (en, fr, de, lb)"
|
|
)
|
|
|
|
@field_validator("username")
|
|
@classmethod
|
|
def validate_username(cls, v):
|
|
if not re.match(r"^[a-zA-Z0-9_]+$", v):
|
|
raise ValueError(
|
|
"Username must contain only letters, numbers, or underscores"
|
|
)
|
|
return v.lower().strip()
|
|
|
|
|
|
class UserListResponse(BaseModel):
|
|
"""Schema for paginated user list."""
|
|
|
|
items: list[UserResponse]
|
|
total: int
|
|
page: int
|
|
per_page: int
|
|
pages: int
|
|
|
|
|
|
class UserSearchItem(BaseModel):
|
|
"""Schema for a single user search result."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
is_active: bool
|
|
|
|
|
|
class UserSearchResponse(BaseModel):
|
|
"""Schema for user search results."""
|
|
|
|
users: list[UserSearchItem]
|
|
|
|
|
|
class UserStatusToggleResponse(BaseModel):
|
|
"""Schema for user status toggle response."""
|
|
|
|
message: str
|
|
is_active: bool
|
|
|
|
|
|
class UserDeleteResponse(BaseModel):
|
|
"""Schema for user delete response."""
|
|
|
|
message: str
|
|
|
|
|
|
class LogoutResponse(BaseModel):
|
|
"""Schema for logout response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetRequest(BaseModel):
|
|
"""Schema for password reset request body (customer / storefront forgot-password)."""
|
|
|
|
email: str
|
|
|
|
|
|
class PasswordResetRequestResponse(BaseModel):
|
|
"""Schema for password reset request response."""
|
|
|
|
message: str
|
|
|
|
|
|
class PasswordResetConfirm(BaseModel):
|
|
"""Schema for password reset confirm body (customer / storefront reset-password)."""
|
|
|
|
reset_token: str
|
|
new_password: str
|
|
|
|
|
|
class PasswordResetResponse(BaseModel):
|
|
"""Schema for password reset response."""
|
|
|
|
message: str
|
|
|
|
|
|
class StoreUserResponse(BaseModel):
|
|
"""Schema for store user info in auth context."""
|
|
|
|
id: int
|
|
username: str
|
|
email: str
|
|
role: str
|
|
is_active: bool
|
|
platform_code: str | None = None
|
|
|
|
model_config = {"from_attributes": True}
|
|
|
|
|
|
class UserContext(BaseModel):
|
|
"""
|
|
User context for dependency injection in API endpoints.
|
|
|
|
This schema replaces direct use of the User database model in API routes,
|
|
following the principle that routes should not import database models directly.
|
|
|
|
Used by:
|
|
- get_current_admin_api / get_current_admin_from_cookie_or_header
|
|
- get_current_store_api / get_current_store_from_cookie_or_header
|
|
- get_current_super_admin
|
|
|
|
For admin users:
|
|
- is_super_admin indicates full platform access
|
|
- accessible_platform_ids is None for super admins (all platforms)
|
|
- accessible_platform_ids is a list for platform admins
|
|
|
|
For store users:
|
|
- token_store_id/code/role come from JWT token
|
|
- These indicate which store context the user is operating in
|
|
"""
|
|
|
|
# Core user fields
|
|
id: int
|
|
email: str
|
|
username: str
|
|
role: str # super_admin, platform_admin, merchant_owner, or store_member
|
|
is_active: bool = True
|
|
|
|
# Admin-specific fields
|
|
accessible_platform_ids: list[int] | None = None # None = all platforms (super admin)
|
|
|
|
# Admin platform context (from JWT token after platform selection)
|
|
token_platform_id: int | None = None
|
|
token_platform_code: str | None = None
|
|
|
|
# Store-specific fields (from JWT token)
|
|
token_store_id: int | None = None
|
|
token_store_code: str | None = None
|
|
token_store_role: str | None = None
|
|
|
|
# Set when the request was authenticated by a paired POS terminal device
|
|
# rather than a human user logging in. The device's row carries the actual
|
|
# principal for audit; this surfaces it for endpoints that care.
|
|
terminal_device_id: int | None = None
|
|
|
|
# Optional profile fields
|
|
first_name: str | None = None
|
|
last_name: str | None = None
|
|
preferred_language: str | None = None
|
|
|
|
model_config = ConfigDict(from_attributes=True)
|
|
|
|
@property
|
|
def full_name(self) -> str:
|
|
"""Returns the full name of the user."""
|
|
if self.first_name and self.last_name:
|
|
return f"{self.first_name} {self.last_name}"
|
|
return self.username
|
|
|
|
@property
|
|
def is_super_admin(self) -> bool:
|
|
"""Check if user is a super admin."""
|
|
return self.role == "super_admin"
|
|
|
|
@property
|
|
def is_admin(self) -> bool:
|
|
"""Check if user is an admin (super_admin or platform_admin)."""
|
|
return self.role in ("super_admin", "platform_admin")
|
|
|
|
@property
|
|
def is_store_user(self) -> bool:
|
|
"""Check if user is a store user (merchant_owner or store_member)."""
|
|
return self.role in ("merchant_owner", "store_member")
|
|
|
|
def can_access_platform(self, platform_id: int) -> bool:
|
|
"""
|
|
Check if user can access a specific platform.
|
|
|
|
Super admins (accessible_platform_ids=None) can access all platforms.
|
|
Platform admins can only access their assigned platforms.
|
|
"""
|
|
if self.is_super_admin:
|
|
return True
|
|
if self.accessible_platform_ids is None:
|
|
return True # Super admin fallback
|
|
return platform_id in self.accessible_platform_ids
|
|
|
|
def get_accessible_platform_ids(self) -> list[int] | None:
|
|
"""
|
|
Get list of platform IDs this user can access.
|
|
|
|
Returns None for super admins (all platforms accessible).
|
|
Returns list of platform IDs for platform admins.
|
|
"""
|
|
return self.accessible_platform_ids
|
|
|
|
@classmethod
|
|
def from_user(cls, user, include_store_context: bool = True) -> "UserContext":
|
|
"""
|
|
Create UserContext from a User database model.
|
|
|
|
Args:
|
|
user: User database model instance
|
|
include_store_context: Whether to include token_store_* fields
|
|
|
|
Returns:
|
|
UserContext instance
|
|
"""
|
|
data = {
|
|
"id": user.id,
|
|
"email": user.email,
|
|
"username": user.username,
|
|
"role": user.role,
|
|
"is_active": user.is_active,
|
|
"first_name": getattr(user, "first_name", None),
|
|
"last_name": getattr(user, "last_name", None),
|
|
"preferred_language": getattr(user, "preferred_language", None),
|
|
}
|
|
|
|
# Add admin platform access info
|
|
if user.is_admin:
|
|
if user.is_super_admin:
|
|
data["accessible_platform_ids"] = None # All platforms
|
|
else:
|
|
# Get platform IDs from admin_platforms relationship
|
|
admin_platforms = getattr(user, "admin_platforms", [])
|
|
data["accessible_platform_ids"] = [
|
|
ap.platform_id for ap in admin_platforms if ap.is_active
|
|
]
|
|
|
|
# Add platform context from JWT token (for platform admins after selection)
|
|
data["token_platform_id"] = getattr(user, "token_platform_id", None)
|
|
data["token_platform_code"] = getattr(user, "token_platform_code", None)
|
|
|
|
# Add store context from JWT token if present
|
|
if include_store_context:
|
|
data["token_store_id"] = getattr(user, "token_store_id", None)
|
|
data["token_store_code"] = getattr(user, "token_store_code", None)
|
|
data["token_store_role"] = getattr(user, "token_store_role", None)
|
|
|
|
# Surface terminal-device principal if the user object came from a
|
|
# device-token authentication path.
|
|
data["terminal_device_id"] = getattr(user, "terminal_device_id", None)
|
|
|
|
return cls(**data)
|