From ec2885dab50dcb931cc98d75f611f173133ead13 Mon Sep 17 00:00:00 2001 From: Samir Boulahtit Date: Tue, 2 Dec 2025 19:39:23 +0100 Subject: [PATCH] refactor: update services for company-centric ownership MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Update admin_service: remove vendor owner methods, fix get_all_vendors query - Update company_service: add transfer_ownership method - Update vendor_service: check ownership via company relationship - Remove backwards compatibility code for Vendor.owner_user_id 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude --- app/services/admin_service.py | 289 +++++++------------------------- app/services/company_service.py | 69 +++++++- app/services/vendor_service.py | 121 ++++++------- 3 files changed, 187 insertions(+), 292 deletions(-) diff --git a/app/services/admin_service.py b/app/services/admin_service.py index c2b2afa9..4da081a1 100644 --- a/app/services/admin_service.py +++ b/app/services/admin_service.py @@ -16,7 +16,7 @@ import string from datetime import UTC, datetime from sqlalchemy import func, or_ -from sqlalchemy.orm import Session +from sqlalchemy.orm import Session, joinedload from app.exceptions import ( AdminOperationException, @@ -28,6 +28,7 @@ from app.exceptions import ( VendorNotFoundException, VendorVerificationException, ) +from models.database.company import Company from models.database.marketplace_import_job import MarketplaceImportJob from models.database.user import User from models.database.vendor import Role, Vendor, VendorUser @@ -100,30 +101,35 @@ class AdminService: # VENDOR MANAGEMENT # ============================================================================ - def create_vendor_with_owner( - self, db: Session, vendor_data: VendorCreate - ) -> tuple[Vendor, User, str]: + def create_vendor(self, db: Session, vendor_data: VendorCreate) -> Vendor: """ - Create vendor with owner user account. + Create a vendor (storefront/brand) under an existing company. - Creates: - 1. User account with owner_email (for authentication) - 2. Vendor with contact_email (for business contact) + The vendor inherits owner and contact information from its parent company. - If contact_email not provided, defaults to owner_email. + Args: + db: Database session + vendor_data: Vendor creation data including company_id - Returns: (vendor, owner_user, temporary_password) + Returns: + The created Vendor object with company relationship loaded + + Raises: + ValidationException: If company not found or vendor code/subdomain exists + AdminOperationException: If creation fails """ try: + # Validate company exists + company = db.query(Company).filter(Company.id == vendor_data.company_id).first() + if not company: + raise ValidationException(f"Company with ID {vendor_data.company_id} not found") + # Check if vendor code already exists existing_vendor = ( db.query(Vendor) - .filter( - func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper() - ) + .filter(func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper()) .first() ) - if existing_vendor: raise VendorAlreadyExistsException(vendor_data.vendor_code) @@ -133,62 +139,23 @@ class AdminService: .filter(func.lower(Vendor.subdomain) == vendor_data.subdomain.lower()) .first() ) - if existing_subdomain: raise ValidationException( f"Subdomain '{vendor_data.subdomain}' is already taken" ) - # Generate temporary password for owner - temp_password = self._generate_temp_password() - - # Create owner user with owner_email - from middleware.auth import AuthManager - - auth_manager = AuthManager() - - owner_username = f"{vendor_data.subdomain}_owner" - owner_email = vendor_data.owner_email # ✅ For User authentication - - # Check if user with this email already exists - existing_user = db.query(User).filter(User.email == owner_email).first() - - if existing_user: - # Use existing user as owner - owner_user = existing_user - else: - # Create new owner user - owner_user = User( - email=owner_email, # ✅ Authentication email - username=owner_username, - hashed_password=auth_manager.hash_password(temp_password), - role="user", - is_active=True, - ) - db.add(owner_user) - db.flush() # Get owner_user.id - - # Determine contact_email - # If provided, use it; otherwise default to owner_email - contact_email = vendor_data.contact_email or owner_email - - # Create vendor + # Create vendor linked to company vendor = Vendor( + company_id=company.id, vendor_code=vendor_data.vendor_code.upper(), subdomain=vendor_data.subdomain.lower(), name=vendor_data.name, description=vendor_data.description, - owner_user_id=owner_user.id, - contact_email=contact_email, # ✅ Business contact email - contact_phone=vendor_data.contact_phone, - website=vendor_data.website, - business_address=vendor_data.business_address, - tax_number=vendor_data.tax_number, letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr, letzshop_csv_url_en=vendor_data.letzshop_csv_url_en, letzshop_csv_url_de=vendor_data.letzshop_csv_url_de, is_active=True, - is_verified=True, + is_verified=False, # Needs verification by admin ) db.add(vendor) db.flush() # Get vendor.id @@ -198,17 +165,12 @@ class AdminService: db.commit() db.refresh(vendor) - db.refresh(owner_user) logger.info( - f"Vendor {vendor.vendor_code} created with owner {owner_user.username} " - f"(owner_email: {owner_email}, contact_email: {contact_email})" + f"Vendor {vendor.vendor_code} created under company {company.name} (ID: {company.id})" ) - # TODO: Send welcome email to owner with credentials - # self._send_vendor_welcome_email(owner_user, vendor, temp_password) - - return vendor, owner_user, temp_password + return vendor except (VendorAlreadyExistsException, ValidationException): db.rollback() @@ -217,7 +179,7 @@ class AdminService: db.rollback() logger.error(f"Failed to create vendor: {str(e)}") raise AdminOperationException( - operation="create_vendor_with_owner", + operation="create_vendor", reason=f"Failed to create vendor: {str(e)}", ) @@ -232,7 +194,8 @@ class AdminService: ) -> tuple[list[Vendor], int]: """Get paginated list of all vendors with filtering.""" try: - query = db.query(Vendor) + # Eagerly load company relationship to avoid N+1 queries + query = db.query(Vendor).options(joinedload(Vendor.company)) # Apply search filter if search: @@ -251,7 +214,24 @@ class AdminService: if is_verified is not None: query = query.filter(Vendor.is_verified == is_verified) - total = query.count() + # Get total count (without joinedload for performance) + count_query = db.query(Vendor) + if search: + search_term = f"%{search}%" + count_query = count_query.filter( + or_( + Vendor.name.ilike(search_term), + Vendor.vendor_code.ilike(search_term), + Vendor.subdomain.ilike(search_term), + ) + ) + if is_active is not None: + count_query = count_query.filter(Vendor.is_active == is_active) + if is_verified is not None: + count_query = count_query.filter(Vendor.is_verified == is_verified) + total = count_query.count() + + # Get paginated results vendors = query.offset(skip).limit(limit).all() return vendors, total @@ -365,9 +345,11 @@ class AdminService: - Status (is_active, is_verified) Cannot update: - - owner_email (use transfer_vendor_ownership instead) - vendor_code (immutable) - - owner_user_id (use transfer_vendor_ownership instead) + - company_id (vendor cannot be moved between companies) + + Note: Ownership is managed at the Company level. + Use company_service.transfer_ownership() for ownership changes. Args: db: Database session @@ -430,168 +412,8 @@ class AdminService: operation="update_vendor", reason=f"Database update failed: {str(e)}" ) - # Add this NEW method for transferring ownership: - - def transfer_vendor_ownership( - self, - db: Session, - vendor_id: int, - transfer_data, # VendorTransferOwnership schema - ) -> tuple[Vendor, User, User]: - """ - Transfer vendor ownership to another user. - - This method: - 1. Validates new owner exists and is active - 2. Removes old owner from "Owner" role (demotes to Manager) - 3. Assigns new owner to "Owner" role - 4. Updates vendor.owner_user_id - 5. Creates audit log entry - - Args: - db: Database session - vendor_id: ID of vendor - transfer_data: Transfer details (new owner ID, confirmation, reason) - - Returns: - Tuple of (vendor, old_owner, new_owner) - - Raises: - VendorNotFoundException: If vendor not found - UserNotFoundException: If new owner user not found - ValidationException: If confirmation not provided or user already owner - """ - - # Require confirmation - if not transfer_data.confirm_transfer: - raise ValidationException( - "Ownership transfer requires confirmation (confirm_transfer=true)" - ) - - # Get vendor - vendor = self._get_vendor_by_id_or_raise(db, vendor_id) - old_owner = vendor.owner - - # Get new owner - new_owner = ( - db.query(User).filter(User.id == transfer_data.new_owner_user_id).first() - ) - - if not new_owner: - raise UserNotFoundException(str(transfer_data.new_owner_user_id)) - - # Check if new owner is active - if not new_owner.is_active: - raise ValidationException( - f"User {new_owner.username} (ID: {new_owner.id}) is not active" - ) - - # Check if already owner - if new_owner.id == old_owner.id: - raise ValidationException( - f"User {new_owner.username} is already the owner of this vendor" - ) - - try: - # Get Owner role for this vendor - owner_role = ( - db.query(Role) - .filter(Role.vendor_id == vendor_id, Role.name == "Owner") - .first() - ) - - if not owner_role: - raise ValidationException("Owner role not found for vendor") - - # Get Manager role (to demote old owner) - manager_role = ( - db.query(Role) - .filter(Role.vendor_id == vendor_id, Role.name == "Manager") - .first() - ) - - # Remove old owner from Owner role - old_owner_link = ( - db.query(VendorUser) - .filter( - VendorUser.vendor_id == vendor_id, - VendorUser.user_id == old_owner.id, - VendorUser.role_id == owner_role.id, - ) - .first() - ) - - if old_owner_link: - if manager_role: - # Demote to Manager role - old_owner_link.role_id = manager_role.id - logger.info( - f"Old owner {old_owner.username} demoted to Manager role " - f"for vendor {vendor.vendor_code}" - ) - else: - # No Manager role, just remove Owner link - db.delete(old_owner_link) - logger.warning( - f"Old owner {old_owner.username} removed from vendor {vendor.vendor_code} " - f"(no Manager role available)" - ) - - # Check if new owner already has a vendor_user link - new_owner_link = ( - db.query(VendorUser) - .filter( - VendorUser.vendor_id == vendor_id, - VendorUser.user_id == new_owner.id, - ) - .first() - ) - - if new_owner_link: - # Update existing link to Owner role - new_owner_link.role_id = owner_role.id - new_owner_link.is_active = True - else: - # Create new Owner link - new_owner_link = VendorUser( - vendor_id=vendor_id, - user_id=new_owner.id, - role_id=owner_role.id, - is_active=True, - ) - db.add(new_owner_link) - - # Update vendor owner_user_id - vendor.owner_user_id = new_owner.id - vendor.updated_at = datetime.now(UTC) - - db.commit() - db.refresh(vendor) - - logger.warning( - f"OWNERSHIP TRANSFERRED for vendor {vendor.vendor_code}: " - f"{old_owner.username} (ID: {old_owner.id}) -> " - f"{new_owner.username} (ID: {new_owner.id}). " - f"Reason: {transfer_data.transfer_reason or 'Not provided'}" - ) - - # TODO: Send notification emails to both old and new owners - # self._send_ownership_transfer_emails(vendor, old_owner, new_owner, transfer_data.transfer_reason) - - return vendor, old_owner, new_owner - - except (ValidationException, UserNotFoundException): - db.rollback() - raise - except Exception as e: - db.rollback() - logger.error( - f"Failed to transfer ownership for vendor {vendor_id}: {str(e)}" - ) - raise AdminOperationException( - operation="transfer_vendor_ownership", - reason=f"Ownership transfer failed: {str(e)}", - ) + # NOTE: Vendor ownership transfer is now handled at the Company level. + # Use company_service.transfer_ownership() instead. # ============================================================================ # MARKETPLACE IMPORT JOBS @@ -701,7 +523,12 @@ class AdminService: def _get_vendor_by_id_or_raise(self, db: Session, vendor_id: int) -> Vendor: """Get vendor by ID or raise VendorNotFoundException.""" - vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first() + vendor = ( + db.query(Vendor) + .options(joinedload(Vendor.company).joinedload(Company.owner)) + .filter(Vendor.id == vendor_id) + .first() + ) if not vendor: raise VendorNotFoundException(str(vendor_id), identifier_type="id") return vendor diff --git a/app/services/company_service.py b/app/services/company_service.py index badf015b..4693ae6e 100644 --- a/app/services/company_service.py +++ b/app/services/company_service.py @@ -13,10 +13,10 @@ from typing import List, Optional from sqlalchemy import func, select from sqlalchemy.orm import Session, joinedload -from app.exceptions import CompanyNotFoundException +from app.exceptions import CompanyNotFoundException, UserNotFoundException from models.database.company import Company from models.database.user import User -from models.schema.company import CompanyCreate, CompanyUpdate +from models.schema.company import CompanyCreate, CompanyTransferOwnership, CompanyUpdate logger = logging.getLogger(__name__) @@ -110,7 +110,7 @@ class CompanyService: select(Company) .where(Company.id == company_id) .options(joinedload(Company.vendors)) - ).scalar_one_or_none() + ).unique().scalar_one_or_none() if not company: raise CompanyNotFoundException(company_id) @@ -257,6 +257,69 @@ class CompanyService: return company + def transfer_ownership( + self, + db: Session, + company_id: int, + transfer_data: CompanyTransferOwnership, + ) -> tuple[Company, User, User]: + """ + Transfer company ownership to another user. + + This is a critical operation that: + - Changes the company's owner_user_id + - All vendors under the company automatically inherit the new owner + - Logs the transfer for audit purposes + + Args: + db: Database session + company_id: Company ID + transfer_data: Transfer ownership data + + Returns: + Tuple of (company, old_owner, new_owner) + + Raises: + CompanyNotFoundException: If company not found + UserNotFoundException: If new owner user not found + ValueError: If trying to transfer to current owner + """ + # Get company + company = self.get_company_by_id(db, company_id) + old_owner_id = company.owner_user_id + + # Get old owner + old_owner = db.execute( + select(User).where(User.id == old_owner_id) + ).scalar_one_or_none() + if not old_owner: + raise UserNotFoundException(str(old_owner_id)) + + # Get new owner + new_owner = db.execute( + select(User).where(User.id == transfer_data.new_owner_user_id) + ).scalar_one_or_none() + if not new_owner: + raise UserNotFoundException(str(transfer_data.new_owner_user_id)) + + # Prevent transferring to same owner + if old_owner_id == transfer_data.new_owner_user_id: + raise ValueError("Cannot transfer ownership to the current owner") + + # Update company owner (vendors inherit ownership via company relationship) + company.owner_user_id = new_owner.id + + db.flush() + + logger.info( + f"Company {company.id} ({company.name}) ownership transferred " + f"from user {old_owner.id} ({old_owner.email}) " + f"to user {new_owner.id} ({new_owner.email}). " + f"Reason: {transfer_data.transfer_reason or 'Not specified'}" + ) + + return company, old_owner, new_owner + def _generate_temp_password(self, length: int = 12) -> str: """Generate secure temporary password.""" alphabet = string.ascii_letters + string.digits + "!@#$%^&*" diff --git a/app/services/vendor_service.py b/app/services/vendor_service.py index 72dcc8bd..c278a958 100644 --- a/app/services/vendor_service.py +++ b/app/services/vendor_service.py @@ -17,7 +17,6 @@ from sqlalchemy.orm import Session from app.exceptions import ( InvalidVendorDataException, MarketplaceProductNotFoundException, - MaxVendorsReachedException, ProductAlreadyExistsException, UnauthorizedVendorAccessException, ValidationException, @@ -41,27 +40,50 @@ class VendorService: self, db: Session, vendor_data: VendorCreate, current_user: User ) -> Vendor: """ - Create a new vendor. + Create a new vendor under a company. + + DEPRECATED: This method is for self-service vendor creation by company owners. + For admin operations, use admin_service.create_vendor() instead. + + The new architecture: + - Companies are the business entities with owners and contact info + - Vendors are storefronts/brands under companies + - The company_id is required in vendor_data Args: db: Database session - vendor_data: Vendor creation data - current_user: User creating the vendor + vendor_data: Vendor creation data (must include company_id) + current_user: User creating the vendor (must be company owner or admin) Returns: Created vendor object Raises: VendorAlreadyExistsException: If vendor code already exists - MaxVendorsReachedException: If user has reached maximum vendors + UnauthorizedVendorAccessException: If user is not company owner InvalidVendorDataException: If vendor data is invalid """ - try: - # Validate vendor data - self._validate_vendor_data(vendor_data) + from models.database.company import Company - # Check user's vendor limit (if applicable) - self._check_vendor_limit(db, current_user) + try: + # Validate company_id is provided + if not hasattr(vendor_data, 'company_id') or not vendor_data.company_id: + raise InvalidVendorDataException( + "company_id is required to create a vendor", field="company_id" + ) + + # Get company and verify ownership + company = db.query(Company).filter(Company.id == vendor_data.company_id).first() + if not company: + raise InvalidVendorDataException( + f"Company with ID {vendor_data.company_id} not found", field="company_id" + ) + + # Check if user is company owner or admin + if current_user.role != "admin" and company.owner_user_id != current_user.id: + raise UnauthorizedVendorAccessException( + f"company-{vendor_data.company_id}", current_user.id + ) # Normalize vendor code to uppercase normalized_vendor_code = vendor_data.vendor_code.upper() @@ -70,13 +92,16 @@ class VendorService: if self._vendor_code_exists(db, normalized_vendor_code): raise VendorAlreadyExistsException(normalized_vendor_code) - # Create vendor with uppercase code - vendor_dict = vendor_data.model_dump() - vendor_dict["vendor_code"] = normalized_vendor_code # Store as uppercase - + # Create vendor linked to company new_vendor = Vendor( - **vendor_dict, - owner_user_id=current_user.id, + company_id=company.id, + vendor_code=normalized_vendor_code, + subdomain=vendor_data.subdomain.lower(), + name=vendor_data.name, + description=vendor_data.description, + letzshop_csv_url_fr=vendor_data.letzshop_csv_url_fr, + letzshop_csv_url_en=vendor_data.letzshop_csv_url_en, + letzshop_csv_url_de=vendor_data.letzshop_csv_url_de, is_active=True, is_verified=(current_user.role == "admin"), ) @@ -86,21 +111,21 @@ class VendorService: db.refresh(new_vendor) logger.info( - f"New vendor created: {new_vendor.vendor_code} by {current_user.username}" + f"New vendor created: {new_vendor.vendor_code} under company {company.name} by {current_user.username}" ) return new_vendor except ( VendorAlreadyExistsException, - MaxVendorsReachedException, + UnauthorizedVendorAccessException, InvalidVendorDataException, ): db.rollback() raise # Re-raise custom exceptions except Exception as e: db.rollback() - logger.error(f"Error creating vendor : {str(e)}") - raise ValidationException("Failed to create vendor ") + logger.error(f"Error creating vendor: {str(e)}") + raise ValidationException("Failed to create vendor") def get_vendors( self, @@ -130,11 +155,19 @@ class VendorService: # Non-admin users can only see active and verified vendors, plus their own if current_user.role != "admin": + # Get vendor IDs the user owns through companies + from models.database.company import Company + + owned_vendor_ids = ( + db.query(Vendor.id) + .join(Company) + .filter(Company.owner_user_id == current_user.id) + .subquery() + ) query = query.filter( (Vendor.is_active == True) & ( - (Vendor.is_verified == True) - | (Vendor.owner_user_id == current_user.id) + (Vendor.is_verified == True) | (Vendor.id.in_(owned_vendor_ids)) ) ) else: @@ -305,38 +338,6 @@ class VendorService: raise ValidationException("Failed to retrieve vendor products") # Private helper methods - def _validate_vendor_data(self, vendor_data: VendorCreate) -> None: - """Validate vendor creation data.""" - if not vendor_data.vendor_code or not vendor_data.vendor_code.strip(): - raise InvalidVendorDataException( - "Vendor code is required", field="vendor_code" - ) - - if not vendor_data.vendor_name or not vendor_data.vendor_name.strip(): - raise InvalidVendorDataException("Vendor name is required", field="name") - - # Validate vendor code format (alphanumeric, underscores, hyphens) - import re - - if not re.match(r"^[A-Za-z0-9_-]+$", vendor_data.vendor_code): - raise InvalidVendorDataException( - "Vendor code can only contain letters, numbers, underscores, and hyphens", - field="vendor_code", - ) - - def _check_vendor_limit(self, db: Session, user: User) -> None: - """Check if user has reached maximum vendor limit.""" - if user.role == "admin": - return # Admins have no limit - - user_vendor_count = ( - db.query(Vendor).filter(Vendor.owner_user_id == user.id).count() - ) - max_vendors = 5 # Configure this as needed - - if user_vendor_count >= max_vendors: - raise MaxVendorsReachedException(max_vendors, user.id) - def _vendor_code_exists(self, db: Session, vendor_code: str) -> bool: """Check if vendor code already exists (case-insensitive).""" return ( @@ -375,16 +376,20 @@ class VendorService: def _can_access_vendor(self, vendor: Vendor, user: User) -> bool: """Check if user can access vendor.""" - # Admins and owners can always access - if user.role == "admin" or vendor.owner_user_id == user.id: + # Admins can always access + if user.role == "admin": + return True + + # Company owners can access their vendors + if vendor.company and vendor.company.owner_user_id == user.id: return True # Others can only access active and verified vendors return vendor.is_active and vendor.is_verified def _is_vendor_owner(self, vendor: Vendor, user: User) -> bool: - """Check if user is vendor owner.""" - return vendor.owner_user_id == user.id + """Check if user is vendor owner (via company ownership).""" + return vendor.company and vendor.company.owner_user_id == user.id # Create service instance following the same pattern as other services