Files
orion/app/services/order_service.py
Samir Boulahtit e0a0da85f8 fix: validate products before creating order to follow architecture rules
Restructured create_letzshop_order to follow the "validate first, then write"
architecture pattern:

Phase 1 (Read-only validation):
- Check if order already exists
- Parse all inventory units and collect GTINs
- Batch query all products by GTIN (single query instead of N queries)
- Validate all products exist - raise ValidationException BEFORE any writes

Phase 2 (Database writes):
- Only after all validation passes, create customer, order, and items

This ensures if validation fails, no database modifications happen, so the
endpoint/task simply doesn't commit - no rollback needed.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-19 21:59:18 +01:00

1072 lines
35 KiB
Python

# app/services/order_service.py
"""
Unified order service for all sales channels.
This service handles:
- Order creation (direct and marketplace)
- Order status management
- Order retrieval and filtering
- Customer creation for marketplace imports
- Order item management
All orders use snapshotted customer and address data.
"""
import logging
import random
import string
from datetime import UTC, datetime
from typing import Any
from sqlalchemy import and_, func, or_
from sqlalchemy.orm import Session
from app.exceptions import (
CustomerNotFoundException,
InsufficientInventoryException,
OrderNotFoundException,
ValidationException,
)
from models.database.customer import Customer
from models.database.order import Order, OrderItem
from models.database.product import Product
from models.database.vendor import Vendor
from models.schema.order import (
AddressSnapshot,
CustomerSnapshot,
OrderCreate,
OrderItemCreate,
OrderUpdate,
)
logger = logging.getLogger(__name__)
class OrderService:
"""Unified service for order operations across all channels."""
# =========================================================================
# Order Number Generation
# =========================================================================
def _generate_order_number(self, db: Session, vendor_id: int) -> str:
"""
Generate unique order number.
Format: ORD-{VENDOR_ID}-{TIMESTAMP}-{RANDOM}
Example: ORD-1-20250110-A1B2C3
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d")
random_suffix = "".join(
random.choices(string.ascii_uppercase + string.digits, k=6)
)
order_number = f"ORD-{vendor_id}-{timestamp}-{random_suffix}"
# Ensure uniqueness
while db.query(Order).filter(Order.order_number == order_number).first():
random_suffix = "".join(
random.choices(string.ascii_uppercase + string.digits, k=6)
)
order_number = f"ORD-{vendor_id}-{timestamp}-{random_suffix}"
return order_number
# =========================================================================
# Customer Management
# =========================================================================
def find_or_create_customer(
self,
db: Session,
vendor_id: int,
email: str,
first_name: str,
last_name: str,
phone: str | None = None,
is_active: bool = False,
) -> Customer:
"""
Find existing customer by email or create new one.
For marketplace imports, customers are created as inactive until
they register on the storefront.
Args:
db: Database session
vendor_id: Vendor ID
email: Customer email
first_name: Customer first name
last_name: Customer last name
phone: Customer phone (optional)
is_active: Whether customer is active (default: False for imports)
Returns:
Customer record (existing or newly created)
"""
# Look for existing customer by email within vendor scope
customer = (
db.query(Customer)
.filter(
and_(
Customer.vendor_id == vendor_id,
Customer.email == email,
)
)
.first()
)
if customer:
return customer
# Generate a unique customer number
timestamp = datetime.now(UTC).strftime("%Y%m%d%H%M%S")
random_suffix = "".join(random.choices(string.digits, k=4))
customer_number = f"CUST-{vendor_id}-{timestamp}-{random_suffix}"
# Create new customer
customer = Customer(
vendor_id=vendor_id,
email=email,
first_name=first_name,
last_name=last_name,
phone=phone,
customer_number=customer_number,
hashed_password="", # No password for imported customers
is_active=is_active,
)
db.add(customer)
db.flush()
logger.info(
f"Created {'active' if is_active else 'inactive'} customer "
f"{customer.id} for vendor {vendor_id}: {email}"
)
return customer
# =========================================================================
# Order Creation
# =========================================================================
def create_order(
self,
db: Session,
vendor_id: int,
order_data: OrderCreate,
) -> Order:
"""
Create a new direct order.
Args:
db: Database session
vendor_id: Vendor ID
order_data: Order creation data
Returns:
Created Order object
Raises:
ValidationException: If order data is invalid
InsufficientInventoryException: If not enough inventory
"""
try:
# Get or create customer
if order_data.customer_id:
customer = (
db.query(Customer)
.filter(
and_(
Customer.id == order_data.customer_id,
Customer.vendor_id == vendor_id,
)
)
.first()
)
if not customer:
raise CustomerNotFoundException(str(order_data.customer_id))
else:
# Create customer from snapshot
customer = self.find_or_create_customer(
db=db,
vendor_id=vendor_id,
email=order_data.customer.email,
first_name=order_data.customer.first_name,
last_name=order_data.customer.last_name,
phone=order_data.customer.phone,
is_active=True, # Direct orders = active customers
)
# Calculate order totals and validate products
subtotal = 0.0
order_items_data = []
for item_data in order_data.items:
product = (
db.query(Product)
.filter(
and_(
Product.id == item_data.product_id,
Product.vendor_id == vendor_id,
Product.is_active == True,
)
)
.first()
)
if not product:
raise ValidationException(
f"Product {item_data.product_id} not found"
)
# Check inventory
if product.available_inventory < item_data.quantity:
raise InsufficientInventoryException(
product_id=product.id,
requested=item_data.quantity,
available=product.available_inventory,
)
# Calculate item total
unit_price = product.sale_price if product.sale_price else product.price
if not unit_price:
raise ValidationException(f"Product {product.id} has no price")
item_total = unit_price * item_data.quantity
subtotal += item_total
order_items_data.append(
{
"product_id": product.id,
"product_name": product.marketplace_product.title
if product.marketplace_product
else product.product_id,
"product_sku": product.product_id,
"gtin": product.gtin,
"gtin_type": product.gtin_type,
"quantity": item_data.quantity,
"unit_price": unit_price,
"total_price": item_total,
}
)
# Calculate totals
tax_amount = 0.0 # TODO: Implement tax calculation
shipping_amount = 5.99 if subtotal < 50 else 0.0
discount_amount = 0.0
total_amount = subtotal + tax_amount + shipping_amount - discount_amount
# Use billing address or shipping address
billing = order_data.billing_address or order_data.shipping_address
# Generate order number
order_number = self._generate_order_number(db, vendor_id)
# Create order with snapshots
order = Order(
vendor_id=vendor_id,
customer_id=customer.id,
order_number=order_number,
channel="direct",
status="pending",
# Financials
subtotal=subtotal,
tax_amount=tax_amount,
shipping_amount=shipping_amount,
discount_amount=discount_amount,
total_amount=total_amount,
currency="EUR",
# Customer snapshot
customer_first_name=order_data.customer.first_name,
customer_last_name=order_data.customer.last_name,
customer_email=order_data.customer.email,
customer_phone=order_data.customer.phone,
customer_locale=order_data.customer.locale,
# Shipping address snapshot
ship_first_name=order_data.shipping_address.first_name,
ship_last_name=order_data.shipping_address.last_name,
ship_company=order_data.shipping_address.company,
ship_address_line_1=order_data.shipping_address.address_line_1,
ship_address_line_2=order_data.shipping_address.address_line_2,
ship_city=order_data.shipping_address.city,
ship_postal_code=order_data.shipping_address.postal_code,
ship_country_iso=order_data.shipping_address.country_iso,
# Billing address snapshot
bill_first_name=billing.first_name,
bill_last_name=billing.last_name,
bill_company=billing.company,
bill_address_line_1=billing.address_line_1,
bill_address_line_2=billing.address_line_2,
bill_city=billing.city,
bill_postal_code=billing.postal_code,
bill_country_iso=billing.country_iso,
# Other
shipping_method=order_data.shipping_method,
customer_notes=order_data.customer_notes,
order_date=datetime.now(UTC),
)
db.add(order)
db.flush()
# Create order items
for item_data in order_items_data:
order_item = OrderItem(order_id=order.id, **item_data)
db.add(order_item)
db.flush()
db.refresh(order)
logger.info(
f"Order {order.order_number} created for vendor {vendor_id}, "
f"total: EUR {total_amount:.2f}"
)
return order
except (
ValidationException,
InsufficientInventoryException,
CustomerNotFoundException,
):
raise
except Exception as e:
logger.error(f"Error creating order: {str(e)}")
raise ValidationException(f"Failed to create order: {str(e)}")
def create_letzshop_order(
self,
db: Session,
vendor_id: int,
shipment_data: dict[str, Any],
) -> Order:
"""
Create an order from Letzshop shipment data.
Validates all products exist BEFORE creating any database records.
This ensures we don't leave the session in an inconsistent state
if validation fails.
Args:
db: Database session
vendor_id: Vendor ID
shipment_data: Raw shipment data from Letzshop API
Returns:
Created Order object
Raises:
ValidationException: If product not found by GTIN
"""
order_data = shipment_data.get("order", {})
# Generate order number using Letzshop order number
letzshop_order_number = order_data.get("number", "")
order_number = f"LS-{vendor_id}-{letzshop_order_number}"
# Check if order already exists (read-only, safe to do first)
existing = (
db.query(Order)
.filter(Order.order_number == order_number)
.first()
)
if existing:
return existing
# =====================================================================
# PHASE 1: Parse and validate all data BEFORE any database writes
# =====================================================================
# Parse inventory units
inventory_units = shipment_data.get("inventoryUnits", [])
if isinstance(inventory_units, dict):
inventory_units = inventory_units.get("nodes", [])
# Collect all GTINs and validate products exist
gtins = set()
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
trade_id = variant.get("tradeId", {}) or {}
gtin = trade_id.get("number")
if gtin:
gtins.add(gtin)
# Batch query all products by GTIN
products_by_gtin: dict[str, Product] = {}
if gtins:
products = (
db.query(Product)
.filter(
and_(
Product.vendor_id == vendor_id,
Product.gtin.in_(gtins),
)
)
.all()
)
products_by_gtin = {p.gtin: p for p in products if p.gtin}
# Validate all products exist BEFORE creating anything
missing_gtins = gtins - set(products_by_gtin.keys())
if missing_gtins:
missing_gtin = next(iter(missing_gtins))
logger.error(
f"Product not found for GTIN {missing_gtin} in vendor {vendor_id}. "
f"Order: {order_number}"
)
raise ValidationException(
f"Product not found for GTIN {missing_gtin}. "
f"Please ensure the product catalog is in sync."
)
# Parse address data
ship_address = order_data.get("shipAddress", {}) or {}
bill_address = order_data.get("billAddress", {}) or {}
ship_country = ship_address.get("country", {}) or {}
bill_country = bill_address.get("country", {}) or {}
# Extract customer info
customer_email = order_data.get("email", "")
ship_first_name = ship_address.get("firstName", "") or ""
ship_last_name = ship_address.get("lastName", "") or ""
# Parse order date
order_date = datetime.now(UTC)
completed_at_str = order_data.get("completedAt")
if completed_at_str:
try:
if completed_at_str.endswith("Z"):
completed_at_str = completed_at_str[:-1] + "+00:00"
order_date = datetime.fromisoformat(completed_at_str)
except (ValueError, TypeError):
pass
# Parse total amount
total_str = order_data.get("total", "0")
try:
# Handle format like "99.99 EUR"
total_amount = float(str(total_str).split()[0])
except (ValueError, IndexError):
total_amount = 0.0
# Map Letzshop state to status
letzshop_state = shipment_data.get("state", "unconfirmed")
status_mapping = {
"unconfirmed": "pending",
"confirmed": "processing",
"declined": "cancelled",
}
status = status_mapping.get(letzshop_state, "pending")
# =====================================================================
# PHASE 2: All validation passed - now create database records
# =====================================================================
# Find or create customer (inactive)
customer = self.find_or_create_customer(
db=db,
vendor_id=vendor_id,
email=customer_email,
first_name=ship_first_name,
last_name=ship_last_name,
is_active=False,
)
# Create order
order = Order(
vendor_id=vendor_id,
customer_id=customer.id,
order_number=order_number,
channel="letzshop",
# External references
external_order_id=order_data.get("id"),
external_shipment_id=shipment_data.get("id"),
external_order_number=letzshop_order_number,
external_data=shipment_data,
# Status
status=status,
# Financials
total_amount=total_amount,
currency="EUR",
# Customer snapshot
customer_first_name=ship_first_name,
customer_last_name=ship_last_name,
customer_email=customer_email,
customer_locale=order_data.get("locale"),
# Shipping address snapshot
ship_first_name=ship_first_name,
ship_last_name=ship_last_name,
ship_company=ship_address.get("company"),
ship_address_line_1=ship_address.get("streetName", "") or "",
ship_address_line_2=ship_address.get("streetNumber"),
ship_city=ship_address.get("city", "") or "",
ship_postal_code=ship_address.get("postalCode", "") or "",
ship_country_iso=ship_country.get("iso", "") or "",
# Billing address snapshot
bill_first_name=bill_address.get("firstName", "") or ship_first_name,
bill_last_name=bill_address.get("lastName", "") or ship_last_name,
bill_company=bill_address.get("company"),
bill_address_line_1=bill_address.get("streetName", "") or "",
bill_address_line_2=bill_address.get("streetNumber"),
bill_city=bill_address.get("city", "") or "",
bill_postal_code=bill_address.get("postalCode", "") or "",
bill_country_iso=bill_country.get("iso", "") or "",
# Dates
order_date=order_date,
confirmed_at=datetime.now(UTC) if status == "processing" else None,
cancelled_at=datetime.now(UTC) if status == "cancelled" else None,
)
db.add(order)
db.flush()
# Create order items from inventory units
for unit in inventory_units:
variant = unit.get("variant", {}) or {}
product_info = variant.get("product", {}) or {}
trade_id = variant.get("tradeId", {}) or {}
product_name_dict = product_info.get("name", {}) or {}
gtin = trade_id.get("number")
gtin_type = trade_id.get("parser")
# Get product from pre-validated map
product = products_by_gtin.get(gtin)
# Get product name
product_name = (
product_name_dict.get("en")
or product_name_dict.get("fr")
or product_name_dict.get("de")
or str(product_name_dict)
)
# Get price
unit_price = 0.0
price_str = variant.get("price", "0")
try:
unit_price = float(str(price_str).split()[0])
except (ValueError, IndexError):
pass
# Map item state
item_state = unit.get("state") # unconfirmed, confirmed_available, etc.
order_item = OrderItem(
order_id=order.id,
product_id=product.id if product else None,
product_name=product_name,
product_sku=variant.get("sku"),
gtin=gtin,
gtin_type=gtin_type,
quantity=1, # Letzshop uses individual inventory units
unit_price=unit_price,
total_price=unit_price,
external_item_id=unit.get("id"),
external_variant_id=variant.get("id"),
item_state=item_state,
)
db.add(order_item)
db.flush()
db.refresh(order)
logger.info(
f"Letzshop order {order.order_number} created for vendor {vendor_id}, "
f"status: {status}, items: {len(inventory_units)}"
)
return order
# =========================================================================
# Order Retrieval
# =========================================================================
def get_order(self, db: Session, vendor_id: int, order_id: int) -> Order:
"""Get order by ID within vendor scope."""
order = (
db.query(Order)
.filter(and_(Order.id == order_id, Order.vendor_id == vendor_id))
.first()
)
if not order:
raise OrderNotFoundException(str(order_id))
return order
def get_order_by_external_shipment_id(
self,
db: Session,
vendor_id: int,
shipment_id: str,
) -> Order | None:
"""Get order by external shipment ID (for Letzshop)."""
return (
db.query(Order)
.filter(
and_(
Order.vendor_id == vendor_id,
Order.external_shipment_id == shipment_id,
)
)
.first()
)
def get_vendor_orders(
self,
db: Session,
vendor_id: int,
skip: int = 0,
limit: int = 50,
status: str | None = None,
channel: str | None = None,
search: str | None = None,
) -> tuple[list[Order], int]:
"""
Get orders for vendor with filtering.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
status: Filter by status
channel: Filter by channel (direct, letzshop)
search: Search by order number, customer name, or email
Returns:
Tuple of (orders, total_count)
"""
query = db.query(Order).filter(Order.vendor_id == vendor_id)
if status:
query = query.filter(Order.status == status)
if channel:
query = query.filter(Order.channel == channel)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Order.order_number.ilike(search_term),
Order.external_order_number.ilike(search_term),
Order.customer_email.ilike(search_term),
Order.customer_first_name.ilike(search_term),
Order.customer_last_name.ilike(search_term),
)
)
# Order by most recent first
query = query.order_by(Order.order_date.desc())
total = query.count()
orders = query.offset(skip).limit(limit).all()
return orders, total
def get_order_stats(self, db: Session, vendor_id: int) -> dict[str, int]:
"""
Get order counts by status for a vendor.
Returns:
Dict with counts for each status.
"""
status_counts = (
db.query(Order.status, func.count(Order.id).label("count"))
.filter(Order.vendor_id == vendor_id)
.group_by(Order.status)
.all()
)
stats = {
"pending": 0,
"processing": 0,
"shipped": 0,
"delivered": 0,
"cancelled": 0,
"refunded": 0,
"total": 0,
}
for status, count in status_counts:
if status in stats:
stats[status] = count
stats["total"] += count
# Also count by channel
channel_counts = (
db.query(Order.channel, func.count(Order.id).label("count"))
.filter(Order.vendor_id == vendor_id)
.group_by(Order.channel)
.all()
)
for channel, count in channel_counts:
stats[f"{channel}_orders"] = count
return stats
# =========================================================================
# Order Updates
# =========================================================================
def update_order_status(
self,
db: Session,
vendor_id: int,
order_id: int,
order_update: OrderUpdate,
) -> Order:
"""
Update order status and tracking information.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
order_update: Update data
Returns:
Updated Order object
"""
order = self.get_order(db, vendor_id, order_id)
now = datetime.now(UTC)
if order_update.status:
old_status = order.status
order.status = order_update.status
# Update timestamps based on status
if order_update.status == "processing" and not order.confirmed_at:
order.confirmed_at = now
elif order_update.status == "shipped" and not order.shipped_at:
order.shipped_at = now
elif order_update.status == "delivered" and not order.delivered_at:
order.delivered_at = now
elif order_update.status == "cancelled" and not order.cancelled_at:
order.cancelled_at = now
if order_update.tracking_number:
order.tracking_number = order_update.tracking_number
if order_update.tracking_provider:
order.tracking_provider = order_update.tracking_provider
if order_update.internal_notes:
order.internal_notes = order_update.internal_notes
order.updated_at = now
db.flush()
db.refresh(order)
logger.info(f"Order {order.order_number} updated: status={order.status}")
return order
def set_order_tracking(
self,
db: Session,
vendor_id: int,
order_id: int,
tracking_number: str,
tracking_provider: str,
) -> Order:
"""
Set tracking information and mark as shipped.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
tracking_number: Tracking number
tracking_provider: Shipping provider
Returns:
Updated Order object
"""
order = self.get_order(db, vendor_id, order_id)
now = datetime.now(UTC)
order.tracking_number = tracking_number
order.tracking_provider = tracking_provider
order.status = "shipped"
order.shipped_at = now
order.updated_at = now
db.flush()
db.refresh(order)
logger.info(
f"Order {order.order_number} shipped: "
f"{tracking_provider} {tracking_number}"
)
return order
def update_item_state(
self,
db: Session,
vendor_id: int,
order_id: int,
item_id: int,
state: str,
) -> OrderItem:
"""
Update the state of an order item (for marketplace confirmation).
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
item_id: Item ID
state: New state (confirmed_available, confirmed_unavailable)
Returns:
Updated OrderItem object
"""
order = self.get_order(db, vendor_id, order_id)
item = (
db.query(OrderItem)
.filter(
and_(
OrderItem.id == item_id,
OrderItem.order_id == order.id,
)
)
.first()
)
if not item:
raise ValidationException(f"Order item {item_id} not found")
item.item_state = state
item.updated_at = datetime.now(UTC)
# Check if all items are processed
all_items = db.query(OrderItem).filter(OrderItem.order_id == order.id).all()
all_confirmed = all(
i.item_state in ("confirmed_available", "confirmed_unavailable")
for i in all_items
)
if all_confirmed:
has_available = any(
i.item_state == "confirmed_available" for i in all_items
)
all_unavailable = all(
i.item_state == "confirmed_unavailable" for i in all_items
)
now = datetime.now(UTC)
if all_unavailable:
order.status = "cancelled"
order.cancelled_at = now
elif has_available:
order.status = "processing"
order.confirmed_at = now
order.updated_at = now
db.flush()
db.refresh(item)
return item
# =========================================================================
# Admin Methods (cross-vendor)
# =========================================================================
def get_all_orders_admin(
self,
db: Session,
skip: int = 0,
limit: int = 50,
vendor_id: int | None = None,
status: str | None = None,
channel: str | None = None,
search: str | None = None,
) -> tuple[list[dict], int]:
"""
Get orders across all vendors for admin.
Args:
db: Database session
skip: Pagination offset
limit: Pagination limit
vendor_id: Filter by vendor
status: Filter by status
channel: Filter by channel
search: Search by order number or customer
Returns:
Tuple of (orders with vendor info, total_count)
"""
query = db.query(Order).join(Vendor)
if vendor_id:
query = query.filter(Order.vendor_id == vendor_id)
if status:
query = query.filter(Order.status == status)
if channel:
query = query.filter(Order.channel == channel)
if search:
search_term = f"%{search}%"
query = query.filter(
or_(
Order.order_number.ilike(search_term),
Order.external_order_number.ilike(search_term),
Order.customer_email.ilike(search_term),
Order.customer_first_name.ilike(search_term),
Order.customer_last_name.ilike(search_term),
)
)
query = query.order_by(Order.order_date.desc())
total = query.count()
orders = query.offset(skip).limit(limit).all()
result = []
for order in orders:
item_count = len(order.items) if order.items else 0
result.append(
{
"id": order.id,
"vendor_id": order.vendor_id,
"vendor_name": order.vendor.name if order.vendor else None,
"vendor_code": order.vendor.vendor_code if order.vendor else None,
"customer_id": order.customer_id,
"customer_full_name": order.customer_full_name,
"customer_email": order.customer_email,
"order_number": order.order_number,
"channel": order.channel,
"status": order.status,
"external_order_number": order.external_order_number,
"external_shipment_id": order.external_shipment_id,
"subtotal": order.subtotal,
"tax_amount": order.tax_amount,
"shipping_amount": order.shipping_amount,
"discount_amount": order.discount_amount,
"total_amount": order.total_amount,
"currency": order.currency,
"ship_country_iso": order.ship_country_iso,
"tracking_number": order.tracking_number,
"tracking_provider": order.tracking_provider,
"item_count": item_count,
"order_date": order.order_date,
"confirmed_at": order.confirmed_at,
"shipped_at": order.shipped_at,
"delivered_at": order.delivered_at,
"cancelled_at": order.cancelled_at,
"created_at": order.created_at,
"updated_at": order.updated_at,
}
)
return result, total
def get_order_stats_admin(self, db: Session) -> dict:
"""Get platform-wide order statistics."""
# Get status counts
status_counts = (
db.query(Order.status, func.count(Order.id))
.group_by(Order.status)
.all()
)
stats = {
"total_orders": 0,
"pending_orders": 0,
"processing_orders": 0,
"shipped_orders": 0,
"delivered_orders": 0,
"cancelled_orders": 0,
"refunded_orders": 0,
"total_revenue": 0.0,
"direct_orders": 0,
"letzshop_orders": 0,
"vendors_with_orders": 0,
}
for status, count in status_counts:
stats["total_orders"] += count
key = f"{status}_orders"
if key in stats:
stats[key] = count
# Get channel counts
channel_counts = (
db.query(Order.channel, func.count(Order.id))
.group_by(Order.channel)
.all()
)
for channel, count in channel_counts:
key = f"{channel}_orders"
if key in stats:
stats[key] = count
# Get total revenue (from delivered orders)
revenue = (
db.query(func.sum(Order.total_amount))
.filter(Order.status == "delivered")
.scalar()
)
stats["total_revenue"] = float(revenue) if revenue else 0.0
# Count vendors with orders
vendors_count = (
db.query(func.count(func.distinct(Order.vendor_id))).scalar() or 0
)
stats["vendors_with_orders"] = vendors_count
return stats
def get_order_by_id_admin(self, db: Session, order_id: int) -> Order:
"""Get order by ID without vendor scope (admin only)."""
order = db.query(Order).filter(Order.id == order_id).first()
if not order:
raise OrderNotFoundException(str(order_id))
return order
def get_vendors_with_orders_admin(self, db: Session) -> list[dict]:
"""Get list of vendors that have orders (admin only)."""
from models.database.vendor import Vendor
# Query vendors with order counts
results = (
db.query(
Vendor.id,
Vendor.name,
Vendor.vendor_code,
func.count(Order.id).label("order_count"),
)
.join(Order, Order.vendor_id == Vendor.id)
.group_by(Vendor.id, Vendor.name, Vendor.vendor_code)
.order_by(func.count(Order.id).desc())
.all()
)
return [
{
"id": row.id,
"name": row.name,
"vendor_code": row.vendor_code,
"order_count": row.order_count,
}
for row in results
]
# Create service instance
order_service = OrderService()