refactor: update services for company-centric ownership

- Update admin_service: remove vendor owner methods, fix get_all_vendors query
- Update company_service: add transfer_ownership method
- Update vendor_service: check ownership via company relationship
- Remove backwards compatibility code for Vendor.owner_user_id

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
2025-12-02 19:39:23 +01:00
parent df8731d7ae
commit ec2885dab5
3 changed files with 187 additions and 292 deletions

View File

@@ -16,7 +16,7 @@ import string
from datetime import UTC, datetime
from sqlalchemy import func, or_
from sqlalchemy.orm import Session
from sqlalchemy.orm import Session, joinedload
from app.exceptions import (
AdminOperationException,
@@ -28,6 +28,7 @@ from app.exceptions import (
VendorNotFoundException,
VendorVerificationException,
)
from models.database.company import Company
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.user import User
from models.database.vendor import Role, Vendor, VendorUser
@@ -100,30 +101,35 @@ class AdminService:
# VENDOR MANAGEMENT
# ============================================================================
def create_vendor_with_owner(
self, db: Session, vendor_data: VendorCreate
) -> tuple[Vendor, User, str]:
def create_vendor(self, db: Session, vendor_data: VendorCreate) -> Vendor:
"""
Create vendor with owner user account.
Create a vendor (storefront/brand) under an existing company.
Creates:
1. User account with owner_email (for authentication)
2. Vendor with contact_email (for business contact)
The vendor inherits owner and contact information from its parent company.
If contact_email not provided, defaults to owner_email.
Args:
db: Database session
vendor_data: Vendor creation data including company_id
Returns: (vendor, owner_user, temporary_password)
Returns:
The created Vendor object with company relationship loaded
Raises:
ValidationException: If company not found or vendor code/subdomain exists
AdminOperationException: If creation fails
"""
try:
# Validate company exists
company = db.query(Company).filter(Company.id == vendor_data.company_id).first()
if not company:
raise ValidationException(f"Company with ID {vendor_data.company_id} not found")
# Check if vendor code already exists
existing_vendor = (
db.query(Vendor)
.filter(
func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper()
)
.filter(func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper())
.first()
)
if existing_vendor:
raise VendorAlreadyExistsException(vendor_data.vendor_code)
@@ -133,62 +139,23 @@ class AdminService:
.filter(func.lower(Vendor.subdomain) == vendor_data.subdomain.lower())
.first()
)
if existing_subdomain:
raise ValidationException(
f"Subdomain '{vendor_data.subdomain}' is already taken"
)
# Generate temporary password for owner
temp_password = self._generate_temp_password()
# Create owner user with owner_email
from middleware.auth import AuthManager
auth_manager = AuthManager()
owner_username = f"{vendor_data.subdomain}_owner"
owner_email = vendor_data.owner_email # ✅ For User authentication
# Check if user with this email already exists
existing_user = db.query(User).filter(User.email == owner_email).first()
if existing_user:
# Use existing user as owner
owner_user = existing_user
else:
# Create new owner user
owner_user = User(
email=owner_email, # ✅ Authentication email
username=owner_username,
hashed_password=auth_manager.hash_password(temp_password),
role="user",
is_active=True,
)
db.add(owner_user)
db.flush() # Get owner_user.id
# Determine contact_email
# If provided, use it; otherwise default to owner_email
contact_email = vendor_data.contact_email or owner_email
# Create vendor
# Create vendor linked to company
vendor = Vendor(
company_id=company.id,
vendor_code=vendor_data.vendor_code.upper(),
subdomain=vendor_data.subdomain.lower(),
name=vendor_data.name,
description=vendor_data.description,
owner_user_id=owner_user.id,
contact_email=contact_email, # ✅ Business contact email
contact_phone=vendor_data.contact_phone,
website=vendor_data.website,
business_address=vendor_data.business_address,
tax_number=vendor_data.tax_number,
letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr,
letzshop_csv_url_en=vendor_data.letzshop_csv_url_en,
letzshop_csv_url_de=vendor_data.letzshop_csv_url_de,
is_active=True,
is_verified=True,
is_verified=False, # Needs verification by admin
)
db.add(vendor)
db.flush() # Get vendor.id
@@ -198,17 +165,12 @@ class AdminService:
db.commit()
db.refresh(vendor)
db.refresh(owner_user)
logger.info(
f"Vendor {vendor.vendor_code} created with owner {owner_user.username} "
f"(owner_email: {owner_email}, contact_email: {contact_email})"
f"Vendor {vendor.vendor_code} created under company {company.name} (ID: {company.id})"
)
# TODO: Send welcome email to owner with credentials
# self._send_vendor_welcome_email(owner_user, vendor, temp_password)
return vendor, owner_user, temp_password
return vendor
except (VendorAlreadyExistsException, ValidationException):
db.rollback()
@@ -217,7 +179,7 @@ class AdminService:
db.rollback()
logger.error(f"Failed to create vendor: {str(e)}")
raise AdminOperationException(
operation="create_vendor_with_owner",
operation="create_vendor",
reason=f"Failed to create vendor: {str(e)}",
)
@@ -232,7 +194,8 @@ class AdminService:
) -> tuple[list[Vendor], int]:
"""Get paginated list of all vendors with filtering."""
try:
query = db.query(Vendor)
# Eagerly load company relationship to avoid N+1 queries
query = db.query(Vendor).options(joinedload(Vendor.company))
# Apply search filter
if search:
@@ -251,7 +214,24 @@ class AdminService:
if is_verified is not None:
query = query.filter(Vendor.is_verified == is_verified)
total = query.count()
# Get total count (without joinedload for performance)
count_query = db.query(Vendor)
if search:
search_term = f"%{search}%"
count_query = count_query.filter(
or_(
Vendor.name.ilike(search_term),
Vendor.vendor_code.ilike(search_term),
Vendor.subdomain.ilike(search_term),
)
)
if is_active is not None:
count_query = count_query.filter(Vendor.is_active == is_active)
if is_verified is not None:
count_query = count_query.filter(Vendor.is_verified == is_verified)
total = count_query.count()
# Get paginated results
vendors = query.offset(skip).limit(limit).all()
return vendors, total
@@ -365,9 +345,11 @@ class AdminService:
- Status (is_active, is_verified)
Cannot update:
- owner_email (use transfer_vendor_ownership instead)
- vendor_code (immutable)
- owner_user_id (use transfer_vendor_ownership instead)
- company_id (vendor cannot be moved between companies)
Note: Ownership is managed at the Company level.
Use company_service.transfer_ownership() for ownership changes.
Args:
db: Database session
@@ -430,168 +412,8 @@ class AdminService:
operation="update_vendor", reason=f"Database update failed: {str(e)}"
)
# Add this NEW method for transferring ownership:
def transfer_vendor_ownership(
self,
db: Session,
vendor_id: int,
transfer_data, # VendorTransferOwnership schema
) -> tuple[Vendor, User, User]:
"""
Transfer vendor ownership to another user.
This method:
1. Validates new owner exists and is active
2. Removes old owner from "Owner" role (demotes to Manager)
3. Assigns new owner to "Owner" role
4. Updates vendor.owner_user_id
5. Creates audit log entry
Args:
db: Database session
vendor_id: ID of vendor
transfer_data: Transfer details (new owner ID, confirmation, reason)
Returns:
Tuple of (vendor, old_owner, new_owner)
Raises:
VendorNotFoundException: If vendor not found
UserNotFoundException: If new owner user not found
ValidationException: If confirmation not provided or user already owner
"""
# Require confirmation
if not transfer_data.confirm_transfer:
raise ValidationException(
"Ownership transfer requires confirmation (confirm_transfer=true)"
)
# Get vendor
vendor = self._get_vendor_by_id_or_raise(db, vendor_id)
old_owner = vendor.owner
# Get new owner
new_owner = (
db.query(User).filter(User.id == transfer_data.new_owner_user_id).first()
)
if not new_owner:
raise UserNotFoundException(str(transfer_data.new_owner_user_id))
# Check if new owner is active
if not new_owner.is_active:
raise ValidationException(
f"User {new_owner.username} (ID: {new_owner.id}) is not active"
)
# Check if already owner
if new_owner.id == old_owner.id:
raise ValidationException(
f"User {new_owner.username} is already the owner of this vendor"
)
try:
# Get Owner role for this vendor
owner_role = (
db.query(Role)
.filter(Role.vendor_id == vendor_id, Role.name == "Owner")
.first()
)
if not owner_role:
raise ValidationException("Owner role not found for vendor")
# Get Manager role (to demote old owner)
manager_role = (
db.query(Role)
.filter(Role.vendor_id == vendor_id, Role.name == "Manager")
.first()
)
# Remove old owner from Owner role
old_owner_link = (
db.query(VendorUser)
.filter(
VendorUser.vendor_id == vendor_id,
VendorUser.user_id == old_owner.id,
VendorUser.role_id == owner_role.id,
)
.first()
)
if old_owner_link:
if manager_role:
# Demote to Manager role
old_owner_link.role_id = manager_role.id
logger.info(
f"Old owner {old_owner.username} demoted to Manager role "
f"for vendor {vendor.vendor_code}"
)
else:
# No Manager role, just remove Owner link
db.delete(old_owner_link)
logger.warning(
f"Old owner {old_owner.username} removed from vendor {vendor.vendor_code} "
f"(no Manager role available)"
)
# Check if new owner already has a vendor_user link
new_owner_link = (
db.query(VendorUser)
.filter(
VendorUser.vendor_id == vendor_id,
VendorUser.user_id == new_owner.id,
)
.first()
)
if new_owner_link:
# Update existing link to Owner role
new_owner_link.role_id = owner_role.id
new_owner_link.is_active = True
else:
# Create new Owner link
new_owner_link = VendorUser(
vendor_id=vendor_id,
user_id=new_owner.id,
role_id=owner_role.id,
is_active=True,
)
db.add(new_owner_link)
# Update vendor owner_user_id
vendor.owner_user_id = new_owner.id
vendor.updated_at = datetime.now(UTC)
db.commit()
db.refresh(vendor)
logger.warning(
f"OWNERSHIP TRANSFERRED for vendor {vendor.vendor_code}: "
f"{old_owner.username} (ID: {old_owner.id}) -> "
f"{new_owner.username} (ID: {new_owner.id}). "
f"Reason: {transfer_data.transfer_reason or 'Not provided'}"
)
# TODO: Send notification emails to both old and new owners
# self._send_ownership_transfer_emails(vendor, old_owner, new_owner, transfer_data.transfer_reason)
return vendor, old_owner, new_owner
except (ValidationException, UserNotFoundException):
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(
f"Failed to transfer ownership for vendor {vendor_id}: {str(e)}"
)
raise AdminOperationException(
operation="transfer_vendor_ownership",
reason=f"Ownership transfer failed: {str(e)}",
)
# NOTE: Vendor ownership transfer is now handled at the Company level.
# Use company_service.transfer_ownership() instead.
# ============================================================================
# MARKETPLACE IMPORT JOBS
@@ -701,7 +523,12 @@ class AdminService:
def _get_vendor_by_id_or_raise(self, db: Session, vendor_id: int) -> Vendor:
"""Get vendor by ID or raise VendorNotFoundException."""
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
vendor = (
db.query(Vendor)
.options(joinedload(Vendor.company).joinedload(Company.owner))
.filter(Vendor.id == vendor_id)
.first()
)
if not vendor:
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
return vendor

View File

@@ -13,10 +13,10 @@ from typing import List, Optional
from sqlalchemy import func, select
from sqlalchemy.orm import Session, joinedload
from app.exceptions import CompanyNotFoundException
from app.exceptions import CompanyNotFoundException, UserNotFoundException
from models.database.company import Company
from models.database.user import User
from models.schema.company import CompanyCreate, CompanyUpdate
from models.schema.company import CompanyCreate, CompanyTransferOwnership, CompanyUpdate
logger = logging.getLogger(__name__)
@@ -110,7 +110,7 @@ class CompanyService:
select(Company)
.where(Company.id == company_id)
.options(joinedload(Company.vendors))
).scalar_one_or_none()
).unique().scalar_one_or_none()
if not company:
raise CompanyNotFoundException(company_id)
@@ -257,6 +257,69 @@ class CompanyService:
return company
def transfer_ownership(
self,
db: Session,
company_id: int,
transfer_data: CompanyTransferOwnership,
) -> tuple[Company, User, User]:
"""
Transfer company ownership to another user.
This is a critical operation that:
- Changes the company's owner_user_id
- All vendors under the company automatically inherit the new owner
- Logs the transfer for audit purposes
Args:
db: Database session
company_id: Company ID
transfer_data: Transfer ownership data
Returns:
Tuple of (company, old_owner, new_owner)
Raises:
CompanyNotFoundException: If company not found
UserNotFoundException: If new owner user not found
ValueError: If trying to transfer to current owner
"""
# Get company
company = self.get_company_by_id(db, company_id)
old_owner_id = company.owner_user_id
# Get old owner
old_owner = db.execute(
select(User).where(User.id == old_owner_id)
).scalar_one_or_none()
if not old_owner:
raise UserNotFoundException(str(old_owner_id))
# Get new owner
new_owner = db.execute(
select(User).where(User.id == transfer_data.new_owner_user_id)
).scalar_one_or_none()
if not new_owner:
raise UserNotFoundException(str(transfer_data.new_owner_user_id))
# Prevent transferring to same owner
if old_owner_id == transfer_data.new_owner_user_id:
raise ValueError("Cannot transfer ownership to the current owner")
# Update company owner (vendors inherit ownership via company relationship)
company.owner_user_id = new_owner.id
db.flush()
logger.info(
f"Company {company.id} ({company.name}) ownership transferred "
f"from user {old_owner.id} ({old_owner.email}) "
f"to user {new_owner.id} ({new_owner.email}). "
f"Reason: {transfer_data.transfer_reason or 'Not specified'}"
)
return company, old_owner, new_owner
def _generate_temp_password(self, length: int = 12) -> str:
"""Generate secure temporary password."""
alphabet = string.ascii_letters + string.digits + "!@#$%^&*"

View File

@@ -17,7 +17,6 @@ from sqlalchemy.orm import Session
from app.exceptions import (
InvalidVendorDataException,
MarketplaceProductNotFoundException,
MaxVendorsReachedException,
ProductAlreadyExistsException,
UnauthorizedVendorAccessException,
ValidationException,
@@ -41,27 +40,50 @@ class VendorService:
self, db: Session, vendor_data: VendorCreate, current_user: User
) -> Vendor:
"""
Create a new vendor.
Create a new vendor under a company.
DEPRECATED: This method is for self-service vendor creation by company owners.
For admin operations, use admin_service.create_vendor() instead.
The new architecture:
- Companies are the business entities with owners and contact info
- Vendors are storefronts/brands under companies
- The company_id is required in vendor_data
Args:
db: Database session
vendor_data: Vendor creation data
current_user: User creating the vendor
vendor_data: Vendor creation data (must include company_id)
current_user: User creating the vendor (must be company owner or admin)
Returns:
Created vendor object
Raises:
VendorAlreadyExistsException: If vendor code already exists
MaxVendorsReachedException: If user has reached maximum vendors
UnauthorizedVendorAccessException: If user is not company owner
InvalidVendorDataException: If vendor data is invalid
"""
try:
# Validate vendor data
self._validate_vendor_data(vendor_data)
from models.database.company import Company
# Check user's vendor limit (if applicable)
self._check_vendor_limit(db, current_user)
try:
# Validate company_id is provided
if not hasattr(vendor_data, 'company_id') or not vendor_data.company_id:
raise InvalidVendorDataException(
"company_id is required to create a vendor", field="company_id"
)
# Get company and verify ownership
company = db.query(Company).filter(Company.id == vendor_data.company_id).first()
if not company:
raise InvalidVendorDataException(
f"Company with ID {vendor_data.company_id} not found", field="company_id"
)
# Check if user is company owner or admin
if current_user.role != "admin" and company.owner_user_id != current_user.id:
raise UnauthorizedVendorAccessException(
f"company-{vendor_data.company_id}", current_user.id
)
# Normalize vendor code to uppercase
normalized_vendor_code = vendor_data.vendor_code.upper()
@@ -70,13 +92,16 @@ class VendorService:
if self._vendor_code_exists(db, normalized_vendor_code):
raise VendorAlreadyExistsException(normalized_vendor_code)
# Create vendor with uppercase code
vendor_dict = vendor_data.model_dump()
vendor_dict["vendor_code"] = normalized_vendor_code # Store as uppercase
# Create vendor linked to company
new_vendor = Vendor(
**vendor_dict,
owner_user_id=current_user.id,
company_id=company.id,
vendor_code=normalized_vendor_code,
subdomain=vendor_data.subdomain.lower(),
name=vendor_data.name,
description=vendor_data.description,
letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr,
letzshop_csv_url_en=vendor_data.letzshop_csv_url_en,
letzshop_csv_url_de=vendor_data.letzshop_csv_url_de,
is_active=True,
is_verified=(current_user.role == "admin"),
)
@@ -86,21 +111,21 @@ class VendorService:
db.refresh(new_vendor)
logger.info(
f"New vendor created: {new_vendor.vendor_code} by {current_user.username}"
f"New vendor created: {new_vendor.vendor_code} under company {company.name} by {current_user.username}"
)
return new_vendor
except (
VendorAlreadyExistsException,
MaxVendorsReachedException,
UnauthorizedVendorAccessException,
InvalidVendorDataException,
):
db.rollback()
raise # Re-raise custom exceptions
except Exception as e:
db.rollback()
logger.error(f"Error creating vendor : {str(e)}")
raise ValidationException("Failed to create vendor ")
logger.error(f"Error creating vendor: {str(e)}")
raise ValidationException("Failed to create vendor")
def get_vendors(
self,
@@ -130,11 +155,19 @@ class VendorService:
# Non-admin users can only see active and verified vendors, plus their own
if current_user.role != "admin":
# Get vendor IDs the user owns through companies
from models.database.company import Company
owned_vendor_ids = (
db.query(Vendor.id)
.join(Company)
.filter(Company.owner_user_id == current_user.id)
.subquery()
)
query = query.filter(
(Vendor.is_active == True)
& (
(Vendor.is_verified == True)
| (Vendor.owner_user_id == current_user.id)
(Vendor.is_verified == True) | (Vendor.id.in_(owned_vendor_ids))
)
)
else:
@@ -305,38 +338,6 @@ class VendorService:
raise ValidationException("Failed to retrieve vendor products")
# Private helper methods
def _validate_vendor_data(self, vendor_data: VendorCreate) -> None:
"""Validate vendor creation data."""
if not vendor_data.vendor_code or not vendor_data.vendor_code.strip():
raise InvalidVendorDataException(
"Vendor code is required", field="vendor_code"
)
if not vendor_data.vendor_name or not vendor_data.vendor_name.strip():
raise InvalidVendorDataException("Vendor name is required", field="name")
# Validate vendor code format (alphanumeric, underscores, hyphens)
import re
if not re.match(r"^[A-Za-z0-9_-]+$", vendor_data.vendor_code):
raise InvalidVendorDataException(
"Vendor code can only contain letters, numbers, underscores, and hyphens",
field="vendor_code",
)
def _check_vendor_limit(self, db: Session, user: User) -> None:
"""Check if user has reached maximum vendor limit."""
if user.role == "admin":
return # Admins have no limit
user_vendor_count = (
db.query(Vendor).filter(Vendor.owner_user_id == user.id).count()
)
max_vendors = 5 # Configure this as needed
if user_vendor_count >= max_vendors:
raise MaxVendorsReachedException(max_vendors, user.id)
def _vendor_code_exists(self, db: Session, vendor_code: str) -> bool:
"""Check if vendor code already exists (case-insensitive)."""
return (
@@ -375,16 +376,20 @@ class VendorService:
def _can_access_vendor(self, vendor: Vendor, user: User) -> bool:
"""Check if user can access vendor."""
# Admins and owners can always access
if user.role == "admin" or vendor.owner_user_id == user.id:
# Admins can always access
if user.role == "admin":
return True
# Company owners can access their vendors
if vendor.company and vendor.company.owner_user_id == user.id:
return True
# Others can only access active and verified vendors
return vendor.is_active and vendor.is_verified
def _is_vendor_owner(self, vendor: Vendor, user: User) -> bool:
"""Check if user is vendor owner."""
return vendor.owner_user_id == user.id
"""Check if user is vendor owner (via company ownership)."""
return vendor.company and vendor.company.owner_user_id == user.id
# Create service instance following the same pattern as other services