fix(loyalty): guard feature provider usage methods against None db session
Fixes deployment test failures where get_store_usage() and get_merchant_usage() were called with db=None but attempted to run queries. Also adds noqa suppressions for pre-existing security validator findings in dev-toolbar (innerHTML with trusted content) and test fixtures (hardcoded test passwords). Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
323
app/modules/tenancy/tests/integration/test_user_account_api.py
Normal file
323
app/modules/tenancy/tests/integration/test_user_account_api.py
Normal file
@@ -0,0 +1,323 @@
|
||||
# app/modules/tenancy/tests/integration/test_user_account_api.py
|
||||
"""
|
||||
Integration tests for self-service user account API endpoints.
|
||||
|
||||
Tests the /account/me endpoints for admin, store, and merchant frontends.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from app.api.deps import get_current_merchant_api
|
||||
from app.modules.tenancy.models import Merchant, Store, StoreUser, User
|
||||
from app.modules.tenancy.schemas.auth import UserContext
|
||||
from main import app
|
||||
from middleware.auth import AuthManager
|
||||
|
||||
# ============================================================================
|
||||
# Fixtures
|
||||
# ============================================================================
|
||||
|
||||
ADMIN_BASE = "/api/v1/admin/account/me"
|
||||
STORE_BASE = "/api/v1/store/account/me"
|
||||
MERCHANT_BASE = "/api/v1/merchants/account/me"
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_admin(db):
|
||||
"""Create an admin user for account tests."""
|
||||
auth = AuthManager()
|
||||
uid = uuid.uuid4().hex[:8]
|
||||
user = User(
|
||||
email=f"ua_admin_{uid}@test.com",
|
||||
username=f"ua_admin_{uid}",
|
||||
hashed_password=auth.hash_password("adminpass123"),
|
||||
first_name="Admin",
|
||||
last_name="User",
|
||||
role="super_admin",
|
||||
is_active=True,
|
||||
is_email_verified=True,
|
||||
)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_admin_headers(client, ua_admin):
|
||||
"""Get admin auth headers."""
|
||||
response = client.post(
|
||||
"/api/v1/admin/auth/login",
|
||||
json={"email_or_username": ua_admin.username, "password": "adminpass123"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
token = response.json()["access_token"]
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_store_user(db):
|
||||
"""Create a store user for account tests."""
|
||||
auth = AuthManager()
|
||||
uid = uuid.uuid4().hex[:8]
|
||||
user = User(
|
||||
email=f"ua_store_{uid}@test.com",
|
||||
username=f"ua_store_{uid}",
|
||||
hashed_password=auth.hash_password("storepass123"),
|
||||
first_name="Store",
|
||||
last_name="User",
|
||||
role="merchant_owner",
|
||||
is_active=True,
|
||||
is_email_verified=True,
|
||||
)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_store_with_user(db, ua_store_user):
|
||||
"""Create a store owned by ua_store_user with StoreUser association."""
|
||||
uid = uuid.uuid4().hex[:8].upper()
|
||||
merchant = Merchant(
|
||||
name=f"UA Merchant {uid}",
|
||||
owner_user_id=ua_store_user.id,
|
||||
contact_email=ua_store_user.email,
|
||||
is_active=True,
|
||||
is_verified=True,
|
||||
)
|
||||
db.add(merchant)
|
||||
db.commit()
|
||||
db.refresh(merchant)
|
||||
|
||||
store = Store(
|
||||
merchant_id=merchant.id,
|
||||
store_code=f"UASTORE_{uid}",
|
||||
subdomain=f"uastore{uid.lower()}",
|
||||
name=f"UA Store {uid}",
|
||||
is_active=True,
|
||||
is_verified=True,
|
||||
)
|
||||
db.add(store)
|
||||
db.commit()
|
||||
db.refresh(store)
|
||||
|
||||
store_user = StoreUser(
|
||||
store_id=store.id,
|
||||
user_id=ua_store_user.id,
|
||||
is_active=True,
|
||||
)
|
||||
db.add(store_user)
|
||||
db.commit()
|
||||
return store
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_store_headers(client, ua_store_user, ua_store_with_user):
|
||||
"""Get store user auth headers."""
|
||||
response = client.post(
|
||||
"/api/v1/store/auth/login",
|
||||
json={
|
||||
"email_or_username": ua_store_user.username,
|
||||
"password": "storepass123",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
token = response.json()["access_token"]
|
||||
return {"Authorization": f"Bearer {token}"}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_merchant_owner(db):
|
||||
"""Create a merchant owner user for account tests."""
|
||||
auth = AuthManager()
|
||||
uid = uuid.uuid4().hex[:8]
|
||||
user = User(
|
||||
email=f"ua_merch_{uid}@test.com",
|
||||
username=f"ua_merch_{uid}",
|
||||
hashed_password=auth.hash_password("merchpass123"),
|
||||
first_name="Merchant",
|
||||
last_name="Owner",
|
||||
role="merchant_owner",
|
||||
is_active=True,
|
||||
is_email_verified=True,
|
||||
)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_merchant(db, ua_merchant_owner):
|
||||
"""Create a merchant owned by ua_merchant_owner."""
|
||||
merchant = Merchant(
|
||||
name="UA Test Merchant",
|
||||
owner_user_id=ua_merchant_owner.id,
|
||||
contact_email=ua_merchant_owner.email,
|
||||
is_active=True,
|
||||
is_verified=True,
|
||||
)
|
||||
db.add(merchant)
|
||||
db.commit()
|
||||
db.refresh(merchant)
|
||||
return merchant
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def ua_merchant_override(ua_merchant_owner):
|
||||
"""Override merchant API auth to return the test user."""
|
||||
user_context = UserContext.from_user(ua_merchant_owner, include_store_context=False)
|
||||
|
||||
app.dependency_overrides[get_current_merchant_api] = lambda: user_context
|
||||
yield
|
||||
app.dependency_overrides.pop(get_current_merchant_api, None)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Admin Account Tests
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.api
|
||||
@pytest.mark.tenancy
|
||||
class TestAdminAccountAPI:
|
||||
"""Tests for admin /account/me endpoints."""
|
||||
|
||||
def test_get_account_me(self, client, ua_admin_headers, ua_admin):
|
||||
response = client.get(ADMIN_BASE, headers=ua_admin_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == ua_admin.id
|
||||
assert data["email"] == ua_admin.email
|
||||
assert data["username"] == ua_admin.username
|
||||
assert data["first_name"] == "Admin"
|
||||
assert "role" in data
|
||||
|
||||
def test_update_account_me(self, client, ua_admin_headers):
|
||||
response = client.put(
|
||||
ADMIN_BASE,
|
||||
headers=ua_admin_headers,
|
||||
json={"first_name": "Updated", "last_name": "Admin"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["first_name"] == "Updated"
|
||||
assert data["last_name"] == "Admin"
|
||||
|
||||
def test_change_password(self, client, ua_admin_headers):
|
||||
response = client.put(
|
||||
f"{ADMIN_BASE}/password",
|
||||
headers=ua_admin_headers,
|
||||
json={
|
||||
"current_password": "adminpass123",
|
||||
"new_password": "newadmin456",
|
||||
"confirm_password": "newadmin456",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert "message" in response.json()
|
||||
|
||||
def test_change_password_wrong_current(self, client, ua_admin_headers):
|
||||
response = client.put(
|
||||
f"{ADMIN_BASE}/password",
|
||||
headers=ua_admin_headers,
|
||||
json={
|
||||
"current_password": "wrongpass",
|
||||
"new_password": "newadmin456",
|
||||
"confirm_password": "newadmin456",
|
||||
},
|
||||
)
|
||||
assert response.status_code in (400, 401, 422)
|
||||
|
||||
def test_unauthenticated(self, client):
|
||||
response = client.get(ADMIN_BASE)
|
||||
assert response.status_code in (401, 403)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Store Account Tests
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.api
|
||||
@pytest.mark.tenancy
|
||||
class TestStoreAccountAPI:
|
||||
"""Tests for store /account/me endpoints."""
|
||||
|
||||
def test_get_account_me(self, client, ua_store_headers, ua_store_user):
|
||||
response = client.get(STORE_BASE, headers=ua_store_headers)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == ua_store_user.id
|
||||
assert data["email"] == ua_store_user.email
|
||||
|
||||
def test_update_account_me(self, client, ua_store_headers):
|
||||
response = client.put(
|
||||
STORE_BASE,
|
||||
headers=ua_store_headers,
|
||||
json={"first_name": "StoreUpdated"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["first_name"] == "StoreUpdated"
|
||||
|
||||
def test_change_password(self, client, ua_store_headers):
|
||||
response = client.put(
|
||||
f"{STORE_BASE}/password",
|
||||
headers=ua_store_headers,
|
||||
json={
|
||||
"current_password": "storepass123",
|
||||
"new_password": "newstore456",
|
||||
"confirm_password": "newstore456",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Merchant Account Tests
|
||||
# ============================================================================
|
||||
|
||||
|
||||
@pytest.mark.integration
|
||||
@pytest.mark.api
|
||||
@pytest.mark.tenancy
|
||||
class TestMerchantAccountAPI:
|
||||
"""Tests for merchant /account/me endpoints."""
|
||||
|
||||
def test_get_account_me(
|
||||
self, client, ua_merchant_owner, ua_merchant, ua_merchant_override
|
||||
):
|
||||
response = client.get(MERCHANT_BASE)
|
||||
assert response.status_code == 200
|
||||
data = response.json()
|
||||
assert data["id"] == ua_merchant_owner.id
|
||||
assert data["email"] == ua_merchant_owner.email
|
||||
|
||||
def test_update_account_me(
|
||||
self, client, ua_merchant_owner, ua_merchant, ua_merchant_override
|
||||
):
|
||||
response = client.put(
|
||||
MERCHANT_BASE,
|
||||
json={"first_name": "MerchUpdated", "last_name": "OwnerUpdated"},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
assert response.json()["first_name"] == "MerchUpdated"
|
||||
|
||||
def test_change_password(
|
||||
self, client, ua_merchant_owner, ua_merchant, ua_merchant_override
|
||||
):
|
||||
response = client.put(
|
||||
f"{MERCHANT_BASE}/password",
|
||||
json={
|
||||
"current_password": "merchpass123",
|
||||
"new_password": "newmerch456",
|
||||
"confirm_password": "newmerch456",
|
||||
},
|
||||
)
|
||||
assert response.status_code == 200
|
||||
91
app/modules/tenancy/tests/unit/test_user_account_schema.py
Normal file
91
app/modules/tenancy/tests/unit/test_user_account_schema.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# app/modules/tenancy/tests/unit/test_user_account_schema.py
|
||||
"""
|
||||
Unit tests for user account schemas.
|
||||
|
||||
Tests validation rules for UserPasswordChange and UserAccountUpdate.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from pydantic import ValidationError
|
||||
|
||||
from app.modules.tenancy.schemas.user_account import (
|
||||
UserAccountUpdate,
|
||||
UserPasswordChange,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.schema
|
||||
class TestUserPasswordChangeSchema:
|
||||
"""Tests for UserPasswordChange validation."""
|
||||
|
||||
def test_valid_password(self):
|
||||
schema = UserPasswordChange(
|
||||
current_password="oldpass123", # noqa: SEC001
|
||||
new_password="newpass456", # noqa: SEC001
|
||||
confirm_password="newpass456", # noqa: SEC001
|
||||
)
|
||||
assert schema.new_password == "newpass456"
|
||||
|
||||
def test_password_too_short(self):
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
UserPasswordChange(
|
||||
current_password="old", # noqa: SEC001
|
||||
new_password="short1", # noqa: SEC001
|
||||
confirm_password="short1", # noqa: SEC001
|
||||
)
|
||||
assert "at least 8" in str(exc_info.value).lower() or "min_length" in str(
|
||||
exc_info.value
|
||||
).lower()
|
||||
|
||||
def test_password_no_digit(self):
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
UserPasswordChange(
|
||||
current_password="oldpass123", # noqa: SEC001
|
||||
new_password="nodigitss", # noqa: SEC001
|
||||
confirm_password="nodigitss", # noqa: SEC001
|
||||
)
|
||||
assert "digit" in str(exc_info.value).lower()
|
||||
|
||||
def test_password_no_letter(self):
|
||||
with pytest.raises(ValidationError) as exc_info:
|
||||
UserPasswordChange(
|
||||
current_password="oldpass123", # noqa: SEC001
|
||||
new_password="12345678", # noqa: SEC001
|
||||
confirm_password="12345678", # noqa: SEC001
|
||||
)
|
||||
assert "letter" in str(exc_info.value).lower()
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.schema
|
||||
class TestUserAccountUpdateSchema:
|
||||
"""Tests for UserAccountUpdate validation."""
|
||||
|
||||
def test_valid_update(self):
|
||||
schema = UserAccountUpdate(
|
||||
first_name="John",
|
||||
last_name="Doe",
|
||||
email="john@example.com",
|
||||
preferred_language="en",
|
||||
)
|
||||
assert schema.first_name == "John"
|
||||
|
||||
def test_partial_update(self):
|
||||
schema = UserAccountUpdate(first_name="John")
|
||||
assert schema.first_name == "John"
|
||||
assert schema.last_name is None
|
||||
assert schema.email is None
|
||||
|
||||
def test_email_validation(self):
|
||||
with pytest.raises(ValidationError):
|
||||
UserAccountUpdate(email="not-an-email")
|
||||
|
||||
def test_language_validation(self):
|
||||
with pytest.raises(ValidationError):
|
||||
UserAccountUpdate(preferred_language="xx")
|
||||
|
||||
def test_valid_languages(self):
|
||||
for lang in ("en", "fr", "de", "lb"):
|
||||
schema = UserAccountUpdate(preferred_language=lang)
|
||||
assert schema.preferred_language == lang
|
||||
146
app/modules/tenancy/tests/unit/test_user_account_service.py
Normal file
146
app/modules/tenancy/tests/unit/test_user_account_service.py
Normal file
@@ -0,0 +1,146 @@
|
||||
# app/modules/tenancy/tests/unit/test_user_account_service.py
|
||||
"""
|
||||
Unit tests for UserAccountService.
|
||||
|
||||
Tests self-service account operations: get, update, change password.
|
||||
"""
|
||||
|
||||
import uuid
|
||||
|
||||
import pytest
|
||||
|
||||
from app.modules.tenancy.exceptions import (
|
||||
InvalidCredentialsException,
|
||||
UserAlreadyExistsException,
|
||||
UserNotFoundException,
|
||||
)
|
||||
from app.modules.tenancy.models import User
|
||||
from app.modules.tenancy.services.user_account_service import UserAccountService
|
||||
from middleware.auth import AuthManager
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def auth_mgr():
|
||||
return AuthManager()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def account_service():
|
||||
return UserAccountService()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def acct_user(db, auth_mgr):
|
||||
"""Create a user for account service tests."""
|
||||
uid = uuid.uuid4().hex[:8]
|
||||
user = User(
|
||||
email=f"acct_{uid}@test.com",
|
||||
username=f"acct_{uid}",
|
||||
hashed_password=auth_mgr.hash_password("oldpass123"),
|
||||
first_name="Test",
|
||||
last_name="User",
|
||||
role="super_admin",
|
||||
is_active=True,
|
||||
is_email_verified=True,
|
||||
preferred_language="en",
|
||||
)
|
||||
db.add(user)
|
||||
db.commit()
|
||||
db.refresh(user)
|
||||
return user
|
||||
|
||||
|
||||
@pytest.mark.unit
|
||||
@pytest.mark.tenancy
|
||||
class TestUserAccountService:
|
||||
"""Tests for UserAccountService."""
|
||||
|
||||
def test_get_account_returns_user(self, db, account_service, acct_user):
|
||||
result = account_service.get_account(db, acct_user.id)
|
||||
assert result.id == acct_user.id
|
||||
assert result.email == acct_user.email
|
||||
|
||||
def test_get_account_not_found_raises(self, db, account_service):
|
||||
with pytest.raises(UserNotFoundException):
|
||||
account_service.get_account(db, 999999)
|
||||
|
||||
def test_update_account_first_last_name(self, db, account_service, acct_user):
|
||||
result = account_service.update_account(
|
||||
db, acct_user.id, {"first_name": "New", "last_name": "Name"}
|
||||
)
|
||||
assert result.first_name == "New"
|
||||
assert result.last_name == "Name"
|
||||
|
||||
def test_update_account_email_uniqueness_conflict(
|
||||
self, db, account_service, acct_user, auth_mgr
|
||||
):
|
||||
uid2 = uuid.uuid4().hex[:8]
|
||||
other = User(
|
||||
email=f"other_{uid2}@test.com",
|
||||
username=f"other_{uid2}",
|
||||
hashed_password=auth_mgr.hash_password("pass1234"),
|
||||
role="store_member",
|
||||
is_active=True,
|
||||
)
|
||||
db.add(other)
|
||||
db.commit()
|
||||
|
||||
with pytest.raises(UserAlreadyExistsException):
|
||||
account_service.update_account(
|
||||
db, acct_user.id, {"email": other.email}
|
||||
)
|
||||
|
||||
def test_update_account_preferred_language(self, db, account_service, acct_user):
|
||||
result = account_service.update_account(
|
||||
db, acct_user.id, {"preferred_language": "fr"}
|
||||
)
|
||||
assert result.preferred_language == "fr"
|
||||
|
||||
def test_change_password_success(self, db, account_service, acct_user, auth_mgr):
|
||||
account_service.change_password(
|
||||
db,
|
||||
acct_user.id,
|
||||
{
|
||||
"current_password": "oldpass123",
|
||||
"new_password": "newpass456",
|
||||
"confirm_password": "newpass456",
|
||||
},
|
||||
)
|
||||
db.refresh(acct_user)
|
||||
assert auth_mgr.verify_password("newpass456", acct_user.hashed_password)
|
||||
|
||||
def test_change_password_wrong_current(self, db, account_service, acct_user):
|
||||
with pytest.raises(InvalidCredentialsException):
|
||||
account_service.change_password(
|
||||
db,
|
||||
acct_user.id,
|
||||
{
|
||||
"current_password": "wrongpass",
|
||||
"new_password": "newpass456",
|
||||
"confirm_password": "newpass456",
|
||||
},
|
||||
)
|
||||
|
||||
def test_change_password_mismatch_confirm(self, db, account_service, acct_user):
|
||||
with pytest.raises(InvalidCredentialsException):
|
||||
account_service.change_password(
|
||||
db,
|
||||
acct_user.id,
|
||||
{
|
||||
"current_password": "oldpass123",
|
||||
"new_password": "newpass456",
|
||||
"confirm_password": "different789",
|
||||
},
|
||||
)
|
||||
|
||||
def test_change_password_same_as_current(self, db, account_service, acct_user):
|
||||
with pytest.raises(InvalidCredentialsException):
|
||||
account_service.change_password(
|
||||
db,
|
||||
acct_user.id,
|
||||
{
|
||||
"current_password": "oldpass123",
|
||||
"new_password": "oldpass123",
|
||||
"confirm_password": "oldpass123",
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user