Files
orion/app/modules/tenancy/services/team_service.py
Samir Boulahtit f20266167d
Some checks failed
CI / ruff (push) Failing after 7s
CI / pytest (push) Failing after 1s
CI / architecture (push) Failing after 9s
CI / dependency-scanning (push) Successful in 27s
CI / audit (push) Successful in 8s
CI / docs (push) Has been skipped
fix(lint): auto-fix ruff violations and tune lint rules
- Auto-fixed 4,496 lint issues (import sorting, modern syntax, etc.)
- Added ignore rules for patterns intentional in this codebase:
  E402 (late imports), E712 (SQLAlchemy filters), B904 (raise from),
  SIM108/SIM105/SIM117 (readability preferences)
- Added per-file ignores for tests and scripts
- Excluded broken scripts/rename_terminology.py (has curly quotes)

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-12 23:10:42 +01:00

217 lines
6.0 KiB
Python

# app/modules/tenancy/services/team_service.py
"""
Team service for store team management.
This module provides:
- Team member invitation
- Role management
- Team member CRUD operations
"""
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.orm import Session
from app.exceptions import ValidationException
from app.modules.tenancy.models import Role, StoreUser, User
logger = logging.getLogger(__name__)
class TeamService:
"""Service for team management operations."""
def get_team_members(
self, db: Session, store_id: int, current_user: User
) -> list[dict[str, Any]]:
"""
Get all team members for store.
Args:
db: Database session
store_id: Store ID
current_user: Current user
Returns:
List of team members
"""
try:
store_users = (
db.query(StoreUser)
.filter(StoreUser.store_id == store_id, StoreUser.is_active == True)
.all()
)
members = []
for vu in store_users:
members.append(
{
"id": vu.user_id,
"email": vu.user.email,
"first_name": vu.user.first_name,
"last_name": vu.user.last_name,
"role": vu.role.name,
"role_id": vu.role_id,
"is_active": vu.is_active,
"joined_at": vu.created_at,
}
)
return members
except Exception as e:
logger.error(f"Error getting team members: {str(e)}")
raise ValidationException("Failed to retrieve team members")
def invite_team_member(
self, db: Session, store_id: int, invitation_data: dict, current_user: User
) -> dict[str, Any]:
"""
Invite a new team member.
Args:
db: Database session
store_id: Store ID
invitation_data: Invitation details
current_user: Current user
Returns:
Invitation result
"""
try:
# TODO: Implement full invitation flow with email
# For now, return placeholder
return {
"message": "Team invitation feature coming soon",
"email": invitation_data.get("email"),
"role": invitation_data.get("role"),
}
except Exception as e:
logger.error(f"Error inviting team member: {str(e)}")
raise ValidationException("Failed to invite team member")
def update_team_member(
self,
db: Session,
store_id: int,
user_id: int,
update_data: dict,
current_user: User,
) -> dict[str, Any]:
"""
Update team member role or status.
Args:
db: Database session
store_id: Store ID
user_id: User ID to update
update_data: Update data
current_user: Current user
Returns:
Updated member info
"""
try:
store_user = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store_id, StoreUser.user_id == user_id
)
.first()
)
if not store_user:
raise ValidationException("Team member not found")
# Update fields
if "role_id" in update_data:
store_user.role_id = update_data["role_id"]
if "is_active" in update_data:
store_user.is_active = update_data["is_active"]
store_user.updated_at = datetime.now(UTC)
db.flush()
db.refresh(store_user)
return {
"message": "Team member updated successfully",
"user_id": user_id,
}
except Exception as e:
logger.error(f"Error updating team member: {str(e)}")
raise ValidationException("Failed to update team member")
def remove_team_member(
self, db: Session, store_id: int, user_id: int, current_user: User
) -> bool:
"""
Remove team member from store.
Args:
db: Database session
store_id: Store ID
user_id: User ID to remove
current_user: Current user
Returns:
True if removed
"""
try:
store_user = (
db.query(StoreUser)
.filter(
StoreUser.store_id == store_id, StoreUser.user_id == user_id
)
.first()
)
if not store_user:
raise ValidationException("Team member not found")
# Soft delete
store_user.is_active = False
store_user.updated_at = datetime.now(UTC)
logger.info(f"Removed user {user_id} from store {store_id}")
return True
except Exception as e:
logger.error(f"Error removing team member: {str(e)}")
raise ValidationException("Failed to remove team member")
def get_store_roles(self, db: Session, store_id: int) -> list[dict[str, Any]]:
"""
Get available roles for store.
Args:
db: Database session
store_id: Store ID
Returns:
List of roles
"""
try:
roles = db.query(Role).filter(Role.store_id == store_id).all()
return [
{
"id": role.id,
"name": role.name,
"permissions": role.permissions,
}
for role in roles
]
except Exception as e:
logger.error(f"Error getting store roles: {str(e)}")
raise ValidationException("Failed to retrieve roles")
# Create service instance
team_service = TeamService()