Files
orion/app/modules/tenancy/schemas/auth.py
Samir Boulahtit 4aa6f76e46
Some checks failed
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / pytest (push) Has been cancelled
CI / ruff (push) Successful in 10s
refactor(arch): move auth schemas to tenancy module and add cross-module service methods
Move all auth schemas (UserContext, UserLogin, LoginResponse, etc.) from
legacy models/schema/auth.py to app/modules/tenancy/schemas/auth.py per
MOD-019. Update 84 import sites across 14 modules. Legacy file now
re-exports for backwards compatibility.

Add missing tenancy service methods for cross-module consumers:
- merchant_service.get_merchant_by_owner_id()
- merchant_service.get_merchant_count_for_owner()
- admin_service.get_user_by_id() (public, was private-only)
- platform_service.get_active_store_count()

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-26 23:57:04 +01:00

344 lines
10 KiB
Python

# app/modules/tenancy/schemas/auth.py
"""
Authentication and user context schemas.
UserContext is the primary schema for dependency injection in API endpoints,
replacing direct use of the User database model in routes.
Migrated from models/schema/auth.py per MOD-019 / MOD-025.
"""
import re
from datetime import datetime
from pydantic import BaseModel, ConfigDict, EmailStr, Field, field_validator
class UserLogin(BaseModel):
email_or_username: str = Field(..., description="Username or email address")
password: str = Field(..., description="Password")
store_code: str | None = Field(
None, description="Optional store code for context"
)
@field_validator("email_or_username")
@classmethod
def validate_email_or_username(cls, v):
return v.strip()
class UserResponse(BaseModel):
model_config = ConfigDict(from_attributes=True)
id: int
email: str
username: str
role: str
is_active: bool
preferred_language: str | None = None
last_login: datetime | None = None
created_at: datetime
updated_at: datetime
class LoginResponse(BaseModel):
access_token: str
token_type: str = "bearer"
expires_in: int
user: UserResponse
class PlatformSelectResponse(BaseModel):
"""Response for platform selection (no user data - client already has it)."""
access_token: str
token_type: str = "bearer"
expires_in: int
platform_id: int
platform_code: str
class OwnedMerchantSummary(BaseModel):
"""Summary of a merchant owned by a user."""
id: int
name: str
is_active: bool
store_count: int
class StoreMembershipSummary(BaseModel):
"""Summary of a user's store membership."""
store_id: int
store_code: str
store_name: str
role: str
is_active: bool
class UserDetailResponse(UserResponse):
"""Extended user response with additional details."""
first_name: str | None = None
last_name: str | None = None
full_name: str | None = None
is_email_verified: bool = False
owned_merchants_count: int = 0
store_memberships_count: int = 0
owned_merchants: list[OwnedMerchantSummary] = []
store_memberships: list[StoreMembershipSummary] = []
class UserUpdate(BaseModel):
"""Schema for updating user information."""
username: str | None = Field(None, min_length=3, max_length=50)
email: EmailStr | None = None
first_name: str | None = Field(None, max_length=100)
last_name: str | None = Field(None, max_length=100)
role: str | None = Field(None, pattern="^(super_admin|platform_admin|merchant_owner|store_member)$")
is_active: bool | None = None
is_email_verified: bool | None = None
preferred_language: str | None = Field(
None, description="Preferred language (en, fr, de, lb)"
)
@field_validator("username")
@classmethod
def validate_username(cls, v):
if v and not re.match(r"^[a-zA-Z0-9_]+$", v):
raise ValueError(
"Username must contain only letters, numbers, or underscores"
)
return v.lower().strip() if v else v
class UserCreate(BaseModel):
"""Schema for creating a new user (admin only)."""
email: EmailStr = Field(..., description="Valid email address")
username: str = Field(..., min_length=3, max_length=50)
password: str = Field(..., min_length=6, description="Password")
first_name: str | None = Field(None, max_length=100)
last_name: str | None = Field(None, max_length=100)
role: str = Field(default="store_member", pattern="^(super_admin|platform_admin|merchant_owner|store_member)$")
preferred_language: str | None = Field(
None, description="Preferred language (en, fr, de, lb)"
)
@field_validator("username")
@classmethod
def validate_username(cls, v):
if not re.match(r"^[a-zA-Z0-9_]+$", v):
raise ValueError(
"Username must contain only letters, numbers, or underscores"
)
return v.lower().strip()
class UserListResponse(BaseModel):
"""Schema for paginated user list."""
items: list[UserResponse]
total: int
page: int
per_page: int
pages: int
class UserSearchItem(BaseModel):
"""Schema for a single user search result."""
id: int
username: str
email: str
is_active: bool
class UserSearchResponse(BaseModel):
"""Schema for user search results."""
users: list[UserSearchItem]
class UserStatusToggleResponse(BaseModel):
"""Schema for user status toggle response."""
message: str
is_active: bool
class UserDeleteResponse(BaseModel):
"""Schema for user delete response."""
message: str
class LogoutResponse(BaseModel):
"""Schema for logout response."""
message: str
class PasswordResetRequestResponse(BaseModel):
"""Schema for password reset request response."""
message: str
class PasswordResetResponse(BaseModel):
"""Schema for password reset response."""
message: str
class StoreUserResponse(BaseModel):
"""Schema for store user info in auth context."""
id: int
username: str
email: str
role: str
is_active: bool
model_config = {"from_attributes": True}
class UserContext(BaseModel):
"""
User context for dependency injection in API endpoints.
This schema replaces direct use of the User database model in API routes,
following the principle that routes should not import database models directly.
Used by:
- get_current_admin_api / get_current_admin_from_cookie_or_header
- get_current_store_api / get_current_store_from_cookie_or_header
- get_current_super_admin
For admin users:
- is_super_admin indicates full platform access
- accessible_platform_ids is None for super admins (all platforms)
- accessible_platform_ids is a list for platform admins
For store users:
- token_store_id/code/role come from JWT token
- These indicate which store context the user is operating in
"""
# Core user fields
id: int
email: str
username: str
role: str # super_admin, platform_admin, merchant_owner, or store_member
is_active: bool = True
# Admin-specific fields
accessible_platform_ids: list[int] | None = None # None = all platforms (super admin)
# Admin platform context (from JWT token after platform selection)
token_platform_id: int | None = None
token_platform_code: str | None = None
# Store-specific fields (from JWT token)
token_store_id: int | None = None
token_store_code: str | None = None
token_store_role: str | None = None
# Optional profile fields
first_name: str | None = None
last_name: str | None = None
preferred_language: str | None = None
model_config = ConfigDict(from_attributes=True)
@property
def full_name(self) -> str:
"""Returns the full name of the user."""
if self.first_name and self.last_name:
return f"{self.first_name} {self.last_name}"
return self.username
@property
def is_super_admin(self) -> bool:
"""Check if user is a super admin."""
return self.role == "super_admin"
@property
def is_admin(self) -> bool:
"""Check if user is an admin (super_admin or platform_admin)."""
return self.role in ("super_admin", "platform_admin")
@property
def is_store_user(self) -> bool:
"""Check if user is a store user (merchant_owner or store_member)."""
return self.role in ("merchant_owner", "store_member")
def can_access_platform(self, platform_id: int) -> bool:
"""
Check if user can access a specific platform.
Super admins (accessible_platform_ids=None) can access all platforms.
Platform admins can only access their assigned platforms.
"""
if self.is_super_admin:
return True
if self.accessible_platform_ids is None:
return True # Super admin fallback
return platform_id in self.accessible_platform_ids
def get_accessible_platform_ids(self) -> list[int] | None:
"""
Get list of platform IDs this user can access.
Returns None for super admins (all platforms accessible).
Returns list of platform IDs for platform admins.
"""
return self.accessible_platform_ids
@classmethod
def from_user(cls, user, include_store_context: bool = True) -> "UserContext":
"""
Create UserContext from a User database model.
Args:
user: User database model instance
include_store_context: Whether to include token_store_* fields
Returns:
UserContext instance
"""
data = {
"id": user.id,
"email": user.email,
"username": user.username,
"role": user.role,
"is_active": user.is_active,
"first_name": getattr(user, "first_name", None),
"last_name": getattr(user, "last_name", None),
"preferred_language": getattr(user, "preferred_language", None),
}
# Add admin platform access info
if user.is_admin:
if user.is_super_admin:
data["accessible_platform_ids"] = None # All platforms
else:
# Get platform IDs from admin_platforms relationship
admin_platforms = getattr(user, "admin_platforms", [])
data["accessible_platform_ids"] = [
ap.platform_id for ap in admin_platforms if ap.is_active
]
# Add platform context from JWT token (for platform admins after selection)
data["token_platform_id"] = getattr(user, "token_platform_id", None)
data["token_platform_code"] = getattr(user, "token_platform_code", None)
# Add store context from JWT token if present
if include_store_context:
data["token_store_id"] = getattr(user, "token_store_id", None)
data["token_store_code"] = getattr(user, "token_store_code", None)
data["token_store_role"] = getattr(user, "token_store_role", None)
return cls(**data)