Files
orion/tests/unit/services/test_admin_service.py
Samir Boulahtit 9920430b9e fix: correct tojson|safe usage in templates and update validator
- Remove |safe from |tojson in HTML attributes (x-data) - quotes must
  become " for browsers to parse correctly
- Update LANG-002 and LANG-003 architecture rules to document correct
  |tojson usage patterns:
  - HTML attributes: |tojson (no |safe)
  - Script blocks: |tojson|safe
- Fix validator to warn when |tojson|safe is used in x-data (breaks
  HTML attribute parsing)
- Improve code quality across services, APIs, and tests

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-13 22:59:51 +01:00

319 lines
12 KiB
Python

# tests/unit/services/test_admin_service.py
import pytest
from app.exceptions import (
AdminOperationException,
CannotModifySelfException,
UserNotFoundException,
UserStatusChangeException,
VendorNotFoundException,
)
from app.services.admin_service import AdminService
from app.services.stats_service import stats_service
@pytest.mark.unit
@pytest.mark.admin
class TestAdminService:
"""Test suite for AdminService following the application's testing patterns"""
def setup_method(self):
"""Setup method following the same pattern as product service tests"""
self.service = AdminService()
# User Management Tests
def test_get_all_users(self, db, test_user, test_admin):
"""Test getting all users with pagination"""
users = self.service.get_all_users(db, skip=0, limit=10)
assert len(users) >= 2 # test_user + test_admin
user_ids = [user.id for user in users]
assert test_user.id in user_ids
assert test_admin.id in user_ids
def test_get_all_users_with_pagination(self, db, test_user, test_admin):
"""Test user pagination works correctly"""
users = self.service.get_all_users(db, skip=0, limit=1)
assert len(users) == 1
users_second_page = self.service.get_all_users(db, skip=1, limit=1)
assert len(users_second_page) == 1
assert users[0].id != users_second_page[0].id
def test_toggle_user_status_deactivate(self, db, test_user, test_admin):
"""Test deactivating a user"""
assert test_user.is_active is True
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is False
assert test_user.username in message
assert "deactivated" in message
def test_toggle_user_status_activate(self, db, test_user, test_admin):
"""Test activating a user"""
from models.database.user import User
# Re-query user to get fresh instance
user_to_deactivate = db.query(User).filter(User.id == test_user.id).first()
user_to_deactivate.is_active = False
db.commit()
user, message = self.service.toggle_user_status(db, test_user.id, test_admin.id)
assert user.id == test_user.id
assert user.is_active is True
assert "activated" in message
def test_toggle_user_status_user_not_found(self, db, test_admin):
"""Test toggle user status when user not found"""
with pytest.raises(UserNotFoundException) as exc_info:
self.service.toggle_user_status(db, 99999, test_admin.id)
exception = exc_info.value
assert exception.error_code == "USER_NOT_FOUND"
assert "99999" in exception.message
def test_toggle_user_status_cannot_modify_self(self, db, test_admin):
"""Test that admin cannot modify their own account"""
with pytest.raises(CannotModifySelfException) as exc_info:
self.service.toggle_user_status(db, test_admin.id, test_admin.id)
exception = exc_info.value
assert exception.error_code == "CANNOT_MODIFY_SELF"
assert "deactivate account" in exception.message
def test_toggle_user_status_cannot_modify_admin(
self, db, test_admin, another_admin
):
"""Test that admin cannot modify another admin"""
with pytest.raises(UserStatusChangeException) as exc_info:
self.service.toggle_user_status(db, another_admin.id, test_admin.id)
exception = exc_info.value
assert exception.error_code == "USER_STATUS_CHANGE_FAILED"
assert "Cannot modify another admin user" in exception.message
# Vendor Management Tests
def test_get_all_vendors(self, db, test_vendor):
"""Test getting all vendors with total count"""
vendors, total = self.service.get_all_vendors(db, skip=0, limit=10)
assert total >= 1
assert len(vendors) >= 1
vendor_codes = [vendor.vendor_code for vendor in vendors]
assert test_vendor.vendor_code in vendor_codes
def test_get_all_vendors_with_pagination(self, db, test_vendor, verified_vendor):
"""Test vendor pagination works correctly"""
vendors, total = self.service.get_all_vendors(db, skip=0, limit=1)
assert total >= 2
assert len(vendors) == 1
vendors_second_page, _ = self.service.get_all_vendors(db, skip=1, limit=1)
assert len(vendors_second_page) >= 0
if len(vendors_second_page) > 0:
assert vendors[0].id != vendors_second_page[0].id
def test_verify_vendor_mark_verified(self, db, test_vendor):
"""Test marking vendor as verified"""
from models.database.vendor import Vendor
# Re-query vendor to get fresh instance
vendor_to_unverify = (
db.query(Vendor).filter(Vendor.id == test_vendor.id).first()
)
vendor_to_unverify.is_verified = False
db.commit()
vendor, message = self.service.verify_vendor(db, test_vendor.id)
assert vendor.id == test_vendor.id
assert vendor.is_verified is True
assert "verified" in message
def test_verify_vendor_mark_unverified(self, db, verified_vendor):
"""Test marking verified vendor as unverified"""
vendor, message = self.service.verify_vendor(db, verified_vendor.id)
assert vendor.id == verified_vendor.id
assert vendor.is_verified is False
assert verified_vendor.vendor_code in message
assert "unverified" in message
def test_verify_vendor_not_found(self, db):
"""Test verify vendor when vendor not found"""
with pytest.raises(VendorNotFoundException) as exc_info:
self.service.verify_vendor(db, 99999)
exception = exc_info.value
assert exception.error_code == "VENDOR_NOT_FOUND"
assert "99999" in exception.message
def test_toggle_vendor_status_deactivate(self, db, test_vendor):
"""Test deactivating a vendor"""
original_status = test_vendor.is_active
vendor, message = self.service.toggle_vendor_status(db, test_vendor.id)
assert vendor.id == test_vendor.id
assert vendor.is_active != original_status
assert test_vendor.vendor_code in message
if original_status:
assert "deactivated" in message
else:
assert "activated" in message
def test_toggle_vendor_status_not_found(self, db):
"""Test toggle vendor status when vendor not found"""
with pytest.raises(VendorNotFoundException) as exc_info:
self.service.toggle_vendor_status(db, 99999)
exception = exc_info.value
assert exception.error_code == "VENDOR_NOT_FOUND"
# Marketplace Import Jobs Tests
def test_get_marketplace_import_jobs_no_filters(
self, db, test_marketplace_import_job
):
"""Test getting marketplace import jobs without filters"""
result = self.service.get_marketplace_import_jobs(db, skip=0, limit=10)
assert len(result) >= 1
# Find our test job in the results
test_job = next(
(job for job in result if job.job_id == test_marketplace_import_job.id),
None,
)
assert test_job is not None
assert (
test_job.marketplace.lower()
== test_marketplace_import_job.marketplace.lower()
)
assert test_job.status == test_marketplace_import_job.status
def test_get_marketplace_import_jobs_with_marketplace_filter(
self, db, test_marketplace_import_job
):
"""Test filtering marketplace import jobs by marketplace"""
result = self.service.get_marketplace_import_jobs(
db, marketplace=test_marketplace_import_job.marketplace, skip=0, limit=10
)
assert len(result) >= 1
for job in result:
assert (
test_marketplace_import_job.marketplace.lower()
in job.marketplace.lower()
)
def test_get_marketplace_import_jobs_with_status_filter(
self, db, test_marketplace_import_job
):
"""Test filtering marketplace import jobs by status"""
result = self.service.get_marketplace_import_jobs(
db, status=test_marketplace_import_job.status, skip=0, limit=10
)
assert len(result) >= 1
for job in result:
assert job.status == test_marketplace_import_job.status
def test_get_marketplace_import_jobs_pagination(
self, db, test_marketplace_import_job
):
"""Test marketplace import jobs pagination"""
result_page1 = self.service.get_marketplace_import_jobs(db, skip=0, limit=1)
result_page2 = self.service.get_marketplace_import_jobs(db, skip=1, limit=1)
assert len(result_page1) >= 0
assert len(result_page2) >= 0
if len(result_page1) > 0 and len(result_page2) > 0:
assert result_page1[0].job_id != result_page2[0].job_id
# Statistics Tests
def test_get_user_statistics(self, db, test_user, test_admin):
"""Test getting user statistics"""
stats = stats_service.get_user_statistics(db)
assert "total_users" in stats
assert "active_users" in stats
assert "inactive_users" in stats
assert "activation_rate" in stats
assert isinstance(stats["total_users"], int)
assert isinstance(stats["active_users"], int)
assert isinstance(stats["inactive_users"], int)
assert isinstance(stats["activation_rate"], (int, float))
assert stats["total_users"] >= 2 # test_user + test_admin
assert stats["active_users"] + stats["inactive_users"] == stats["total_users"]
def test_get_vendor_statistics(self, db, test_vendor):
"""Test getting vendor statistics"""
stats = stats_service.get_vendor_statistics(db)
assert "total_vendors" in stats
assert "active_vendors" in stats
assert "verified_vendors" in stats
assert "verification_rate" in stats
assert isinstance(stats["total_vendors"], int)
assert isinstance(stats["active_vendors"], int)
assert isinstance(stats["verified_vendors"], int)
assert isinstance(stats["verification_rate"], (int, float))
assert stats["total_vendors"] >= 1
# Error Handling Tests
def test_get_all_users_database_error(self, db_with_error, test_admin):
"""Test handling database errors in get_all_users"""
with pytest.raises(AdminOperationException) as exc_info:
self.service.get_all_users(db_with_error, skip=0, limit=10)
exception = exc_info.value
assert exception.error_code == "ADMIN_OPERATION_FAILED"
assert "get_all_users" in exception.message
def test_get_all_vendors_database_error(self, db_with_error):
"""Test handling database errors in get_all_vendors"""
with pytest.raises(AdminOperationException) as exc_info:
self.service.get_all_vendors(db_with_error, skip=0, limit=10)
exception = exc_info.value
assert exception.error_code == "ADMIN_OPERATION_FAILED"
assert "get_all_vendors" in exception.message
# Edge Cases
def test_get_all_users_empty_database(self, empty_db):
"""Test getting users when database is empty"""
users = self.service.get_all_users(empty_db, skip=0, limit=10)
assert len(users) == 0
def test_get_all_vendors_empty_database(self, empty_db):
"""Test getting vendors when database is empty"""
vendors, total = self.service.get_all_vendors(empty_db, skip=0, limit=10)
assert len(vendors) == 0
assert total == 0
def test_user_statistics_empty_database(self, empty_db):
"""Test user statistics when no users exist"""
stats = stats_service.get_user_statistics(empty_db)
assert stats["total_users"] == 0
assert stats["active_users"] == 0
assert stats["inactive_users"] == 0
assert stats["activation_rate"] == 0
def test_vendor_statistics_empty_database(self, empty_db):
"""Test vendor statistics when no vendors exist"""
stats = stats_service.get_vendor_statistics(empty_db)
assert stats["total_vendors"] == 0
assert stats["active_vendors"] == 0
assert stats["verified_vendors"] == 0
assert stats["verification_rate"] == 0