Files
orion/app/services/admin_service.py

222 lines
7.1 KiB
Python

# app/services/admin_service.py
"""Summary description ....
This module provides classes and functions for:
- ....
- ....
- ....
"""
import logging
from datetime import datetime
from typing import List, Optional, Tuple
from fastapi import HTTPException
from sqlalchemy.orm import Session
from models.schemas.marketplace import MarketplaceImportJobResponse
from models.database.marketplace import MarketplaceImportJob
from models.database.shop import Shop
from models.database.user import User
logger = logging.getLogger(__name__)
class AdminService:
"""Service class for admin operations following the application's service pattern."""
def get_all_users(self, db: Session, skip: int = 0, limit: int = 100) -> List[User]:
"""Get paginated list of all users."""
return db.query(User).offset(skip).limit(limit).all()
def toggle_user_status(
self, db: Session, user_id: int, current_admin_id: int
) -> Tuple[User, str]:
"""
Toggle user active status.
Args:
db: Database session
user_id: ID of user to toggle
current_admin_id: ID of the admin performing the action
Returns:
Tuple of (updated_user, status_message)
Raises:
HTTPException: If user not found or trying to deactivate own account
"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == current_admin_id:
raise HTTPException(
status_code=400, detail="Cannot deactivate your own account"
)
user.is_active = not user.is_active
user.updated_at = datetime.utcnow()
db.commit()
db.refresh(user)
status = "activated" if user.is_active else "deactivated"
logger.info(
f"User {user.username} has been {status} by admin {current_admin_id}"
)
return user, f"User {user.username} has been {status}"
def get_all_shops(
self, db: Session, skip: int = 0, limit: int = 100
) -> Tuple[List[Shop], int]:
"""
Get paginated list of all shops with total count.
Args:
db: Database session
skip: Number of records to skip
limit: Maximum number of records to return
Returns:
Tuple of (shops_list, total_count)
"""
total = db.query(Shop).count()
shops = db.query(Shop).offset(skip).limit(limit).all()
return shops, total
def verify_shop(self, db: Session, shop_id: int) -> Tuple[Shop, str]:
"""
Toggle shop verification status.
Args:
db: Database session
shop_id: ID of shop to verify/unverify
Returns:
Tuple of (updated_shop, status_message)
Raises:
HTTPException: If shop not found
"""
shop = db.query(Shop).filter(Shop.id == shop_id).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
shop.is_verified = not shop.is_verified
shop.updated_at = datetime.utcnow()
db.commit()
db.refresh(shop)
status = "verified" if shop.is_verified else "unverified"
logger.info(f"Shop {shop.shop_code} has been {status}")
return shop, f"Shop {shop.shop_code} has been {status}"
def toggle_shop_status(self, db: Session, shop_id: int) -> Tuple[Shop, str]:
"""
Toggle shop active status.
Args:
db: Database session
shop_id: ID of shop to activate/deactivate
Returns:
Tuple of (updated_shop, status_message)
Raises:
HTTPException: If shop not found
"""
shop = db.query(Shop).filter(Shop.id == shop_id).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
shop.is_active = not shop.is_active
shop.updated_at = datetime.utcnow()
db.commit()
db.refresh(shop)
status = "activated" if shop.is_active else "deactivated"
logger.info(f"Shop {shop.shop_code} has been {status}")
return shop, f"Shop {shop.shop_code} has been {status}"
def get_marketplace_import_jobs(
self,
db: Session,
marketplace: Optional[str] = None,
shop_name: Optional[str] = None,
status: Optional[str] = None,
skip: int = 0,
limit: int = 100,
) -> List[MarketplaceImportJobResponse]:
"""
Get filtered and paginated marketplace import jobs.
Args:
db: Database session
marketplace: Filter by marketplace name (case-insensitive partial match)
shop_name: Filter by shop name (case-insensitive partial match)
status: Filter by exact status
skip: Number of records to skip
limit: Maximum number of records to return
Returns:
List of MarketplaceImportJobResponse objects
"""
query = db.query(MarketplaceImportJob)
# Apply filters
if marketplace:
query = query.filter(
MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%")
)
if shop_name:
query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%"))
if status:
query = query.filter(MarketplaceImportJob.status == status)
# Order by creation date and apply pagination
jobs = (
query.order_by(MarketplaceImportJob.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
return [
MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
shop_id=job.shop.id,
shop_name=job.shop_name,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at,
)
for job in jobs
]
def get_user_by_id(self, db: Session, user_id: int) -> Optional[User]:
"""Get user by ID."""
return db.query(User).filter(User.id == user_id).first()
def get_shop_by_id(self, db: Session, shop_id: int) -> Optional[Shop]:
"""Get shop by ID."""
return db.query(Shop).filter(Shop.id == shop_id).first()
def user_exists(self, db: Session, user_id: int) -> bool:
"""Check if user exists by ID."""
return db.query(User).filter(User.id == user_id).first() is not None
def shop_exists(self, db: Session, shop_id: int) -> bool:
"""Check if shop exists by ID."""
return db.query(Shop).filter(Shop.id == shop_id).first() is not None
# Create service instance following the same pattern as product_service
admin_service = AdminService()