Files
orion/app/modules/cart/services/cart_service.py
Samir Boulahtit 86e85a98b8
Some checks failed
CI / ruff (push) Successful in 9s
CI / docs (push) Has been cancelled
CI / deploy (push) Has been cancelled
CI / validate (push) Has been cancelled
CI / dependency-scanning (push) Has been cancelled
CI / pytest (push) Has been cancelled
refactor(arch): eliminate all cross-module model imports in service layer
Enforce MOD-025/MOD-026 rules: zero top-level cross-module model imports
remain in any service file. All 66 files migrated using deferred import
patterns (method-body, _get_model() helpers, instance-cached self._Model)
and new cross-module service methods in tenancy. Documentation updated
with Pattern 6 (deferred imports), migration plan marked complete, and
violations status reflects 84→0 service-layer violations.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-02-27 06:13:15 +01:00

447 lines
13 KiB
Python

# app/modules/cart/services/cart_service.py
"""
Shopping cart service.
This module provides:
- Session-based cart management
- Cart item operations (add, update, remove)
- Cart total calculations
All monetary calculations use integer cents internally for precision.
See docs/architecture/money-handling.md for details.
"""
import logging
from sqlalchemy import and_
from sqlalchemy.orm import Session
from app.modules.cart.exceptions import (
CartItemNotFoundException,
InsufficientInventoryForCartException,
InvalidCartQuantityException,
)
from app.modules.cart.models.cart import CartItem
from app.modules.catalog.exceptions import ProductNotFoundException
from app.utils.money import cents_to_euros
logger = logging.getLogger(__name__)
class CartService:
"""Service for managing shopping carts."""
def get_cart(self, db: Session, store_id: int, session_id: str) -> dict:
"""
Get cart contents for a session.
Args:
db: Database session
store_id: Store ID
session_id: Session ID
Returns:
Cart data with items and totals
"""
logger.info(
"[CART_SERVICE] get_cart called",
extra={
"store_id": store_id,
"session_id": session_id,
},
)
# Fetch cart items from database
cart_items = (
db.query(CartItem)
.filter(
and_(CartItem.store_id == store_id, CartItem.session_id == session_id)
)
.all()
)
logger.info(
f"[CART_SERVICE] Found {len(cart_items)} items in database",
extra={"item_count": len(cart_items)},
)
# Build response - calculate totals in cents, return euros
items = []
subtotal_cents = 0
for cart_item in cart_items:
product = cart_item.product
line_total_cents = cart_item.line_total_cents
items.append(
{
"product_id": product.id,
"product_name": product.marketplace_product.get_title("en")
if product.marketplace_product
else str(product.id),
"quantity": cart_item.quantity,
"price": cart_item.price_at_add, # Returns euros via property
"line_total": cents_to_euros(line_total_cents),
"image_url": (
product.marketplace_product.image_link
if product.marketplace_product
else None
),
}
)
subtotal_cents += line_total_cents
# Convert to euros for API response
subtotal = cents_to_euros(subtotal_cents)
cart_data = {
"store_id": store_id,
"session_id": session_id,
"items": items,
"subtotal": subtotal,
"total": subtotal, # Could add tax/shipping later
}
logger.info(
f"[CART_SERVICE] get_cart returning: {len(cart_data['items'])} items, total: {cart_data['total']}",
extra={"cart": cart_data},
)
return cart_data
def add_to_cart(
self,
db: Session,
store_id: int,
session_id: str,
product_id: int,
quantity: int = 1,
) -> dict:
"""
Add product to cart.
Args:
db: Database session
store_id: Store ID
session_id: Session ID
product_id: Product ID
quantity: Quantity to add
Returns:
Updated cart
Raises:
ProductNotFoundException: If product not found
InsufficientInventoryException: If not enough inventory
"""
logger.info(
"[CART_SERVICE] add_to_cart called",
extra={
"store_id": store_id,
"session_id": session_id,
"product_id": product_id,
"quantity": quantity,
},
)
# Verify product exists and belongs to store
from app.modules.catalog.services.product_service import product_service
try:
product = product_service.get_product(db, store_id, product_id)
except ProductNotFoundException:
logger.error(
"[CART_SERVICE] Product not found",
extra={"product_id": product_id, "store_id": store_id},
)
raise ProductNotFoundException(product_id=product_id, store_id=store_id)
if not product.is_active:
logger.error(
"[CART_SERVICE] Product not found",
extra={"product_id": product_id, "store_id": store_id},
)
raise ProductNotFoundException(product_id=product_id, store_id=store_id)
logger.info(
f"[CART_SERVICE] Product found: {product.marketplace_product.title}",
extra={
"product_id": product_id,
"product_name": product.marketplace_product.title,
"available_inventory": product.available_inventory,
},
)
# Get current price in cents (use sale_price if available, otherwise regular price)
current_price_cents = (
product.sale_price_cents
or product.price_cents
or 0
)
# Check if item already exists in cart
existing_item = (
db.query(CartItem)
.filter(
and_(
CartItem.store_id == store_id,
CartItem.session_id == session_id,
CartItem.product_id == product_id,
)
)
.first()
)
if existing_item:
# Update quantity
new_quantity = existing_item.quantity + quantity
# Check inventory for new total quantity
if product.available_inventory < new_quantity:
logger.warning(
"[CART_SERVICE] Insufficient inventory for update",
extra={
"product_id": product_id,
"current_in_cart": existing_item.quantity,
"adding": quantity,
"requested_total": new_quantity,
"available": product.available_inventory,
},
)
raise InsufficientInventoryForCartException(
product_id=product_id,
product_name=product.marketplace_product.title,
requested=new_quantity,
available=product.available_inventory,
)
existing_item.quantity = new_quantity
db.flush()
db.refresh(existing_item)
logger.info(
"[CART_SERVICE] Updated existing cart item",
extra={"cart_item_id": existing_item.id, "new_quantity": new_quantity},
)
return {
"message": "Product quantity updated in cart",
"product_id": product_id,
"quantity": new_quantity,
}
# Check inventory for new item
if product.available_inventory < quantity:
logger.warning(
"[CART_SERVICE] Insufficient inventory",
extra={
"product_id": product_id,
"requested": quantity,
"available": product.available_inventory,
},
)
raise InsufficientInventoryForCartException(
product_id=product_id,
product_name=product.marketplace_product.title,
requested=quantity,
available=product.available_inventory,
)
# Create new cart item (price stored in cents)
cart_item = CartItem(
store_id=store_id,
session_id=session_id,
product_id=product_id,
quantity=quantity,
price_at_add_cents=current_price_cents,
)
db.add(cart_item)
db.flush()
db.refresh(cart_item)
logger.info(
"[CART_SERVICE] Created new cart item",
extra={
"cart_item_id": cart_item.id,
"quantity": quantity,
"price_cents": current_price_cents,
},
)
return {
"message": "Product added to cart",
"product_id": product_id,
"quantity": quantity,
}
def update_cart_item(
self,
db: Session,
store_id: int,
session_id: str,
product_id: int,
quantity: int,
) -> dict:
"""
Update quantity of item in cart.
Args:
db: Database session
store_id: Store ID
session_id: Session ID
product_id: Product ID
quantity: New quantity (must be >= 1)
Returns:
Success message
Raises:
ValidationException: If quantity < 1
ProductNotFoundException: If product not found
InsufficientInventoryException: If not enough inventory
"""
if quantity < 1:
raise InvalidCartQuantityException(quantity=quantity, min_quantity=1)
# Find cart item
cart_item = (
db.query(CartItem)
.filter(
and_(
CartItem.store_id == store_id,
CartItem.session_id == session_id,
CartItem.product_id == product_id,
)
)
.first()
)
if not cart_item:
raise CartItemNotFoundException(
product_id=product_id, session_id=session_id
)
# Verify product still exists and is active
from app.modules.catalog.services.product_service import product_service
try:
product = product_service.get_product(db, store_id, product_id)
except ProductNotFoundException:
raise ProductNotFoundException(str(product_id))
if not product.is_active:
raise ProductNotFoundException(str(product_id))
# Check inventory
if product.available_inventory < quantity:
raise InsufficientInventoryForCartException(
product_id=product_id,
product_name=product.marketplace_product.title,
requested=quantity,
available=product.available_inventory,
)
# Update quantity
cart_item.quantity = quantity
db.flush()
db.refresh(cart_item)
logger.info(
"[CART_SERVICE] Updated cart item quantity",
extra={
"cart_item_id": cart_item.id,
"product_id": product_id,
"new_quantity": quantity,
},
)
return {
"message": "Cart updated",
"product_id": product_id,
"quantity": quantity,
}
def remove_from_cart(
self, db: Session, store_id: int, session_id: str, product_id: int
) -> dict:
"""
Remove item from cart.
Args:
db: Database session
store_id: Store ID
session_id: Session ID
product_id: Product ID
Returns:
Success message
Raises:
ProductNotFoundException: If product not in cart
"""
# Find and delete cart item
cart_item = (
db.query(CartItem)
.filter(
and_(
CartItem.store_id == store_id,
CartItem.session_id == session_id,
CartItem.product_id == product_id,
)
)
.first()
)
if not cart_item:
raise CartItemNotFoundException(
product_id=product_id, session_id=session_id
)
db.delete(cart_item)
logger.info(
"[CART_SERVICE] Removed item from cart",
extra={
"cart_item_id": cart_item.id,
"product_id": product_id,
"session_id": session_id,
},
)
return {"message": "Item removed from cart", "product_id": product_id}
def clear_cart(self, db: Session, store_id: int, session_id: str) -> dict:
"""
Clear all items from cart.
Args:
db: Database session
store_id: Store ID
session_id: Session ID
Returns:
Success message with count of items removed
"""
# Delete all cart items for this session
deleted_count = (
db.query(CartItem)
.filter(
and_(CartItem.store_id == store_id, CartItem.session_id == session_id)
)
.delete()
)
logger.info(
"[CART_SERVICE] Cleared cart",
extra={
"session_id": session_id,
"store_id": store_id,
"items_removed": deleted_count,
},
)
return {"message": "Cart cleared", "items_removed": deleted_count}
# Create service instance
cart_service = CartService()