Files
orion/app/services/order_service.py
Samir Boulahtit 238c1ec9b8 refactor: modernize code quality tooling with Ruff
- Replace black, isort, and flake8 with Ruff (all-in-one linter and formatter)
- Add comprehensive pyproject.toml configuration
- Simplify Makefile code quality targets
- Configure exclusions for venv/.venv in pyproject.toml
- Auto-fix 1,359 linting issues across codebase

Benefits:
- Much faster builds (Ruff is written in Rust)
- Single tool replaces multiple tools
- More comprehensive rule set (UP, B, C4, SIM, PIE, RET, Q)
- All configuration centralized in pyproject.toml
- Better import sorting and formatting consistency

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-11-28 19:37:38 +01:00

375 lines
12 KiB
Python

# app/services/order_service.py
"""
Order service for order management.
This module provides:
- Order creation from cart
- Order status management
- Order retrieval and filtering
"""
import logging
import random
import string
from datetime import UTC, datetime
from sqlalchemy import and_
from sqlalchemy.orm import Session
from app.exceptions import (
CustomerNotFoundException,
InsufficientInventoryException,
OrderNotFoundException,
ValidationException,
)
from models.database.customer import Customer, CustomerAddress
from models.database.order import Order, OrderItem
from models.database.product import Product
from models.schema.order import OrderAddressCreate, OrderCreate, OrderUpdate
logger = logging.getLogger(__name__)
class OrderService:
"""Service for order operations."""
def _generate_order_number(self, db: Session, vendor_id: int) -> str:
"""
Generate unique order number.
Format: ORD-{VENDOR_ID}-{TIMESTAMP}-{RANDOM}
Example: ORD-1-20250110-A1B2C3
"""
timestamp = datetime.now(UTC).strftime("%Y%m%d")
random_suffix = "".join(
random.choices(string.ascii_uppercase + string.digits, k=6)
)
order_number = f"ORD-{vendor_id}-{timestamp}-{random_suffix}"
# Ensure uniqueness
while db.query(Order).filter(Order.order_number == order_number).first():
random_suffix = "".join(
random.choices(string.ascii_uppercase + string.digits, k=6)
)
order_number = f"ORD-{vendor_id}-{timestamp}-{random_suffix}"
return order_number
def _create_customer_address(
self,
db: Session,
vendor_id: int,
customer_id: int,
address_data: OrderAddressCreate,
address_type: str,
) -> CustomerAddress:
"""Create a customer address for order."""
address = CustomerAddress(
vendor_id=vendor_id,
customer_id=customer_id,
address_type=address_type,
first_name=address_data.first_name,
last_name=address_data.last_name,
company=address_data.company,
address_line_1=address_data.address_line_1,
address_line_2=address_data.address_line_2,
city=address_data.city,
postal_code=address_data.postal_code,
country=address_data.country,
is_default=False,
)
db.add(address)
db.flush() # Get ID without committing
return address
def create_order(
self, db: Session, vendor_id: int, order_data: OrderCreate
) -> Order:
"""
Create a new order.
Args:
db: Database session
vendor_id: Vendor ID
order_data: Order creation data
Returns:
Created Order object
Raises:
ValidationException: If order data is invalid
InsufficientInventoryException: If not enough inventory
"""
try:
# Validate customer exists if provided
customer_id = order_data.customer_id
if customer_id:
customer = (
db.query(Customer)
.filter(
and_(
Customer.id == customer_id, Customer.vendor_id == vendor_id
)
)
.first()
)
if not customer:
raise CustomerNotFoundException(str(customer_id))
else:
# Guest checkout - create guest customer
# TODO: Implement guest customer creation
raise ValidationException("Guest checkout not yet implemented")
# Create shipping address
shipping_address = self._create_customer_address(
db=db,
vendor_id=vendor_id,
customer_id=customer_id,
address_data=order_data.shipping_address,
address_type="shipping",
)
# Create billing address (use shipping if not provided)
if order_data.billing_address:
billing_address = self._create_customer_address(
db=db,
vendor_id=vendor_id,
customer_id=customer_id,
address_data=order_data.billing_address,
address_type="billing",
)
else:
billing_address = shipping_address
# Calculate order totals
subtotal = 0.0
order_items_data = []
for item_data in order_data.items:
# Get product
product = (
db.query(Product)
.filter(
and_(
Product.id == item_data.product_id,
Product.vendor_id == vendor_id,
Product.is_active == True,
)
)
.first()
)
if not product:
raise ValidationException(
f"Product {item_data.product_id} not found"
)
# Check inventory
if product.available_inventory < item_data.quantity:
raise InsufficientInventoryException(
product_id=product.id,
requested=item_data.quantity,
available=product.available_inventory,
)
# Calculate item total
unit_price = product.sale_price if product.sale_price else product.price
if not unit_price:
raise ValidationException(f"Product {product.id} has no price")
item_total = unit_price * item_data.quantity
subtotal += item_total
order_items_data.append(
{
"product_id": product.id,
"product_name": product.marketplace_product.title,
"product_sku": product.product_id,
"quantity": item_data.quantity,
"unit_price": unit_price,
"total_price": item_total,
}
)
# Calculate tax and shipping (simple implementation)
tax_amount = 0.0 # TODO: Implement tax calculation
shipping_amount = 5.99 if subtotal < 50 else 0.0 # Free shipping over €50
discount_amount = 0.0 # TODO: Implement discounts
total_amount = subtotal + tax_amount + shipping_amount - discount_amount
# Generate order number
order_number = self._generate_order_number(db, vendor_id)
# Create order
order = Order(
vendor_id=vendor_id,
customer_id=customer_id,
order_number=order_number,
status="pending",
subtotal=subtotal,
tax_amount=tax_amount,
shipping_amount=shipping_amount,
discount_amount=discount_amount,
total_amount=total_amount,
currency="EUR",
shipping_address_id=shipping_address.id,
billing_address_id=billing_address.id,
shipping_method=order_data.shipping_method,
customer_notes=order_data.customer_notes,
)
db.add(order)
db.flush() # Get order ID
# Create order items
for item_data in order_items_data:
order_item = OrderItem(order_id=order.id, **item_data)
db.add(order_item)
db.commit()
db.refresh(order)
logger.info(
f"Order {order.order_number} created for vendor {vendor_id}, "
f"total: €{total_amount:.2f}"
)
return order
except (
ValidationException,
InsufficientInventoryException,
CustomerNotFoundException,
):
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Error creating order: {str(e)}")
raise ValidationException(f"Failed to create order: {str(e)}")
def get_order(self, db: Session, vendor_id: int, order_id: int) -> Order:
"""Get order by ID."""
order = (
db.query(Order)
.filter(and_(Order.id == order_id, Order.vendor_id == vendor_id))
.first()
)
if not order:
raise OrderNotFoundException(str(order_id))
return order
def get_vendor_orders(
self,
db: Session,
vendor_id: int,
skip: int = 0,
limit: int = 100,
status: str | None = None,
customer_id: int | None = None,
) -> tuple[list[Order], int]:
"""
Get orders for vendor with filtering.
Args:
db: Database session
vendor_id: Vendor ID
skip: Pagination offset
limit: Pagination limit
status: Filter by status
customer_id: Filter by customer
Returns:
Tuple of (orders, total_count)
"""
query = db.query(Order).filter(Order.vendor_id == vendor_id)
if status:
query = query.filter(Order.status == status)
if customer_id:
query = query.filter(Order.customer_id == customer_id)
# Order by most recent first
query = query.order_by(Order.created_at.desc())
total = query.count()
orders = query.offset(skip).limit(limit).all()
return orders, total
def get_customer_orders(
self,
db: Session,
vendor_id: int,
customer_id: int,
skip: int = 0,
limit: int = 100,
) -> tuple[list[Order], int]:
"""Get orders for a specific customer."""
return self.get_vendor_orders(
db=db, vendor_id=vendor_id, skip=skip, limit=limit, customer_id=customer_id
)
def update_order_status(
self, db: Session, vendor_id: int, order_id: int, order_update: OrderUpdate
) -> Order:
"""
Update order status and tracking information.
Args:
db: Database session
vendor_id: Vendor ID
order_id: Order ID
order_update: Update data
Returns:
Updated Order object
"""
try:
order = self.get_order(db, vendor_id, order_id)
# Update status with timestamps
if order_update.status:
order.status = order_update.status
# Update timestamp based on status
now = datetime.now(UTC)
if order_update.status == "shipped" and not order.shipped_at:
order.shipped_at = now
elif order_update.status == "delivered" and not order.delivered_at:
order.delivered_at = now
elif order_update.status == "cancelled" and not order.cancelled_at:
order.cancelled_at = now
# Update tracking number
if order_update.tracking_number:
order.tracking_number = order_update.tracking_number
# Update internal notes
if order_update.internal_notes:
order.internal_notes = order_update.internal_notes
order.updated_at = datetime.now(UTC)
db.commit()
db.refresh(order)
logger.info(f"Order {order.order_number} updated: status={order.status}")
return order
except OrderNotFoundException:
db.rollback()
raise
except Exception as e:
db.rollback()
logger.error(f"Error updating order: {str(e)}")
raise ValidationException(f"Failed to update order: {str(e)}")
# Create service instance
order_service = OrderService()