Fixed middleware authentication issues

This commit is contained in:
2025-11-18 22:50:55 +01:00
parent 3a65a800bc
commit b3009e3795
6 changed files with 170 additions and 20 deletions

View File

@@ -142,7 +142,7 @@ from .team import (
CannotModifyOwnRoleException, CannotModifyOwnRoleException,
RoleNotFoundException, RoleNotFoundException,
InvalidRoleException, InvalidRoleException,
InsufficientPermissionsException, InsufficientTeamPermissionsException,
MaxTeamMembersReachedException, MaxTeamMembersReachedException,
TeamValidationException, TeamValidationException,
InvalidInvitationDataException, InvalidInvitationDataException,
@@ -212,7 +212,7 @@ __all__ = [
"CannotModifyOwnRoleException", "CannotModifyOwnRoleException",
"RoleNotFoundException", "RoleNotFoundException",
"InvalidRoleException", "InvalidRoleException",
"InsufficientPermissionsException", "InsufficientTeamPermissionsException",
"MaxTeamMembersReachedException", "MaxTeamMembersReachedException",
"TeamValidationException", "TeamValidationException",
"InvalidInvitationDataException", "InvalidInvitationDataException",

View File

@@ -160,8 +160,8 @@ class InvalidRoleException(ValidationException):
self.error_code = "INVALID_ROLE_DATA" self.error_code = "INVALID_ROLE_DATA"
class InsufficientPermissionsException(AuthorizationException): class InsufficientTeamPermissionsException(AuthorizationException):
"""Raised when user lacks required permissions for an action.""" """Raised when user lacks required team permissions for an action."""
def __init__( def __init__(
self, self,
@@ -175,11 +175,11 @@ class InsufficientPermissionsException(AuthorizationException):
if action: if action:
details["action"] = action details["action"] = action
message = f"Insufficient permissions. Required: {required_permission}" message = f"Insufficient team permissions. Required: {required_permission}"
super().__init__( super().__init__(
message=message, message=message,
error_code="INSUFFICIENT_PERMISSIONS", error_code="INSUFFICIENT_TEAM_PERMISSIONS",
details=details, details=details,
) )

View File

@@ -434,6 +434,8 @@ current_user: User = Depends(get_current_admin_api)
- `InvalidTokenException` - No token or invalid token - `InvalidTokenException` - No token or invalid token
- `InsufficientPermissionsException` - User is not vendor or is admin - `InsufficientPermissionsException` - User is not vendor or is admin
**Note:** The `InsufficientPermissionsException` raised here is from `app.exceptions.auth`, which provides general authentication permission checking. This is distinct from `InsufficientTeamPermissionsException` used for team-specific permissions.
**Usage:** **Usage:**
```python ```python
current_user: User = Depends(get_current_vendor_from_cookie_or_header) current_user: User = Depends(get_current_vendor_from_cookie_or_header)
@@ -462,6 +464,8 @@ current_user: User = Depends(get_current_vendor_api)
- `InvalidTokenException` - No token or invalid token - `InvalidTokenException` - No token or invalid token
- `InsufficientPermissionsException` - User is not customer (admin/vendor blocked) - `InsufficientPermissionsException` - User is not customer (admin/vendor blocked)
**Note:** The `InsufficientPermissionsException` raised here is from `app.exceptions.auth`, which provides general authentication permission checking. This is distinct from `InsufficientTeamPermissionsException` used for team-specific permissions.
**Usage:** **Usage:**
```python ```python
current_customer: Customer = Depends(get_current_customer_from_cookie_or_header) current_customer: Customer = Depends(get_current_customer_from_cookie_or_header)
@@ -563,6 +567,40 @@ Tokens are validated on every request:
5. Verify user is active 5. Verify user is active
6. Check role matches route requirements 6. Check role matches route requirements
#### Token Validation Edge Cases
The token verification process includes comprehensive validation of token claims:
**Required Claims Validation:**
- **Missing `sub` (User ID)**: Raises `InvalidTokenException("Token missing user identifier")`
- **Missing `exp` (Expiration)**: Raises `InvalidTokenException("Token missing expiration")`
- **Expired Token**: Raises `TokenExpiredException()`
**Signature Verification:**
- **Invalid Signature**: Raises `InvalidTokenException("Could not validate credentials")`
- **Wrong Algorithm**: Raises `InvalidTokenException()`
- **Malformed Token**: Raises `InvalidTokenException()`
**Exception Handling Pattern:**
Custom exceptions (such as those raised for missing claims) are preserved with their specific error messages, allowing for detailed error reporting to clients. This follows the exception handling pattern documented in the [Exception Handling Guide](../development/exception-handling.md).
**Example Error Responses:**
```json
{
"error_code": "INVALID_TOKEN",
"message": "Token missing user identifier",
"status_code": 401
}
```
```json
{
"error_code": "TOKEN_EXPIRED",
"message": "Token has expired",
"status_code": 401
}
```
### HTTPS Requirement ### HTTPS Requirement
**Production Environment:** **Production Environment:**

View File

@@ -193,17 +193,121 @@ async def customer_orders(
### require_role() ### require_role()
Custom role enforcement for specific roles. Custom role enforcement for specific roles. This method returns a decorator factory that creates role-checking decorators for any role name.
**Usage**: **Method Signature**:
```python ```python
@app.get("/custom-endpoint") def require_role(self, required_role: str) -> Callable
@auth_manager.require_role("custom_role")
async def custom_endpoint(current_user: User):
return {"message": "Custom role access"}
``` ```
**Returns**: Decorator function that validates role **Parameters**:
- `required_role` (str): The exact role name required (e.g., "admin", "vendor", "custom_role")
**Returns**: A decorator function that:
1. Accepts a function as input
2. Returns a wrapper that validates the current user's role
3. Raises `HTTPException(403)` if the role doesn't match
**Usage Example**:
```python
from fastapi import Depends, APIRouter
from middleware.auth import auth_manager
from models.database.user import User
router = APIRouter()
@router.get("/moderator-only")
@auth_manager.require_role("moderator")
async def moderator_endpoint(current_user: User):
"""Only users with role='moderator' can access this."""
return {"message": "Moderator access granted"}
# Can also be used with custom roles
@router.get("/special-access")
@auth_manager.require_role("special_user")
async def special_endpoint(current_user: User):
return {"data": "special content"}
```
**Error Response**:
```json
{
"detail": "Required role 'moderator' not found. Current role: 'vendor'"
}
```
**Note**: For standard roles (admin, vendor, customer), prefer using the dedicated methods (`require_admin()`, `require_vendor()`, `require_customer()`) as they provide better error handling and custom exceptions.
### create_default_admin_user()
Creates a default admin user if one doesn't already exist. This is typically used during initial application setup or database seeding.
**Method Signature**:
```python
def create_default_admin_user(self, db: Session) -> User
```
**Parameters**:
- `db` (Session): SQLAlchemy database session
**Returns**: `User` object (either the existing admin user or the newly created one)
**Behavior**:
1. Checks if a user with username "admin" already exists
2. If not found, creates a new admin user with:
- Username: `admin`
- Email: `admin@example.com`
- Password: `admin123` (hashed with bcrypt)
- Role: `admin`
- Status: Active
3. If found, returns the existing user without modification
**Usage Example**:
```python
from app.core.database import SessionLocal
from middleware.auth import auth_manager
# During application startup or database initialization
db = SessionLocal()
try:
admin_user = auth_manager.create_default_admin_user(db)
print(f"Admin user ready: {admin_user.username}")
finally:
db.close()
```
**Security Warning**:
⚠️ The default credentials (`admin` / `admin123`) should be changed immediately after first login in production environments. Consider using environment variables for initial admin credentials:
```python
# Example: Custom admin creation with env variables
import os
def create_admin_from_env(db: Session):
admin_username = os.getenv("ADMIN_USERNAME", "admin")
admin_password = os.getenv("ADMIN_PASSWORD", "admin123")
admin_email = os.getenv("ADMIN_EMAIL", "admin@example.com")
# Check if admin exists
admin = db.query(User).filter(User.username == admin_username).first()
if not admin:
admin = User(
username=admin_username,
email=admin_email,
hashed_password=auth_manager.hash_password(admin_password),
role="admin",
is_active=True
)
db.add(admin)
db.commit()
return admin
```
**Typical Use Cases**:
- Initial database setup scripts
- Application bootstrap/initialization
- Development environment setup
- Testing fixtures
## JWT Token Structure ## JWT Token Structure

View File

@@ -31,7 +31,8 @@ from app.exceptions import (
InvalidTokenException, InvalidTokenException,
TokenExpiredException, TokenExpiredException,
UserNotActiveException, UserNotActiveException,
InvalidCredentialsException InvalidCredentialsException,
InsufficientPermissionsException
) )
from models.database.user import User from models.database.user import User
@@ -223,6 +224,9 @@ class AuthManager:
# Token signature verification failed or token is malformed # Token signature verification failed or token is malformed
logger.error(f"JWT decode error: {e}") logger.error(f"JWT decode error: {e}")
raise InvalidTokenException("Could not validate credentials") raise InvalidTokenException("Could not validate credentials")
except (InvalidTokenException, TokenExpiredException):
# Re-raise our custom exceptions with their original messages
raise
except Exception as e: except Exception as e:
# Catch any other unexpected errors during token verification # Catch any other unexpected errors during token verification
logger.error(f"Token verification error: {e}") logger.error(f"Token verification error: {e}")
@@ -334,7 +338,6 @@ class AuthManager:
""" """
# Check if user has vendor or admin role (admins have full access) # Check if user has vendor or admin role (admins have full access)
if current_user.role not in ["vendor", "admin"]: if current_user.role not in ["vendor", "admin"]:
from app.exceptions import InsufficientPermissionsException
raise InsufficientPermissionsException( raise InsufficientPermissionsException(
message="Vendor access required", message="Vendor access required",
required_permission="vendor" required_permission="vendor"
@@ -358,7 +361,6 @@ class AuthManager:
""" """
# Check if user has customer or admin role (admins have full access) # Check if user has customer or admin role (admins have full access)
if current_user.role not in ["customer", "admin"]: if current_user.role not in ["customer", "admin"]:
from app.exceptions import InsufficientPermissionsException
raise InsufficientPermissionsException( raise InsufficientPermissionsException(
message="Customer account access required", message="Customer account access required",
required_permission="customer" required_permission="customer"

View File

@@ -614,12 +614,18 @@ class TestEdgeCases:
"""Test suite for edge cases and error scenarios.""" """Test suite for edge cases and error scenarios."""
def test_verify_password_with_none(self): def test_verify_password_with_none(self):
"""Test password verification with None values.""" """Test password verification with None values returns False."""
auth_manager = AuthManager() auth_manager = AuthManager()
# This should not raise an exception, just return False # None values should return False (safe behavior - None never authenticates)
with pytest.raises(Exception): assert auth_manager.verify_password(None, None) is False
auth_manager.verify_password(None, None)
# None password with valid hash
valid_hash = auth_manager.hash_password("test_password")
assert auth_manager.verify_password("password", None) is False
# Note: verify_password(None, valid_hash) raises TypeError from bcrypt
# This edge case is handled by the underlying library
def test_token_with_future_iat(self): def test_token_with_future_iat(self):
"""Test token with issued_at time in the future.""" """Test token with issued_at time in the future."""