Files
orion/main.py

1392 lines
49 KiB
Python

from fastapi import FastAPI, HTTPException, Query, Depends, BackgroundTasks
from fastapi.responses import StreamingResponse
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from pydantic import BaseModel, Field, validator
from typing import Optional, List, Dict, Any
from datetime import datetime, timedelta
from sqlalchemy import create_engine, Column, Integer, String, DateTime, text, ForeignKey, Index, func
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session, relationship
from contextlib import asynccontextmanager
import pandas as pd
import requests
from io import StringIO, BytesIO
import logging
import asyncio
import time
from functools import wraps
import os
from dotenv import load_dotenv
# Import utility modules
from utils.data_processing import GTINProcessor, PriceProcessor
from utils.csv_processor import CSVProcessor
from utils.database import get_db_engine, get_session_local
from models.database_models import Base, Product, Stock, User, MarketplaceImportJob, Shop, ShopProduct
from models.api_models import *
from middleware.rate_limiter import RateLimiter
from middleware.auth import AuthManager
# Load environment variables
load_dotenv()
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Initialize processors
gtin_processor = GTINProcessor()
price_processor = PriceProcessor()
csv_processor = CSVProcessor()
# Database setup
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/ecommerce")
engine = get_db_engine(DATABASE_URL)
SessionLocal = get_session_local(engine)
# Rate limiter and auth manager
rate_limiter = RateLimiter()
auth_manager = AuthManager()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events"""
# Startup
logger.info("Starting up ecommerce API with marketplace import support")
# Create tables
Base.metadata.create_all(bind=engine)
# Create default admin user
db = SessionLocal()
try:
auth_manager.create_default_admin_user(db)
except Exception as e:
logger.error(f"Failed to create default admin user: {e}")
finally:
db.close()
# Add indexes
with engine.connect() as conn:
try:
# User indexes
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_email ON users(email)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_username ON users(username)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_role ON users(role)"))
# Product indexes (including new marketplace indexes)
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_gtin ON products(gtin)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_brand ON products(brand)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_category ON products(google_product_category)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_availability ON products(availability)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_marketplace ON products(marketplace)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_shop_name ON products(shop_name)"))
conn.execute(
text("CREATE INDEX IF NOT EXISTS idx_product_marketplace_shop ON products(marketplace, shop_name)"))
# Stock indexes
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_gtin_location ON stock(gtin, location)"))
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_location ON stock(location)"))
# Marketplace import job indexes
conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_marketplace_import_marketplace ON marketplace_import_jobs(marketplace)"))
conn.execute(text(
"CREATE INDEX IF NOT EXISTS idx_marketplace_import_shop_name ON marketplace_import_jobs(shop_name)"))
conn.execute(
text("CREATE INDEX IF NOT EXISTS idx_marketplace_import_user_id ON marketplace_import_jobs(user_id)"))
conn.commit()
logger.info("Database indexes created successfully")
except Exception as e:
logger.warning(f"Index creation warning: {e}")
yield
# Shutdown
logger.info("Shutting down ecommerce API")
# FastAPI app with lifespan
app = FastAPI(
title="Ecommerce Backend API with Marketplace Support",
description="Advanced product management system with JWT authentication, marketplace-aware CSV "
"import/export and stock management",
version="2.2.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Security
security = HTTPBearer()
# Database dependency with connection pooling
def get_db():
db = SessionLocal()
try:
yield db
except Exception as e:
db.rollback()
logger.error(f"Database error: {e}")
raise
finally:
db.close()
# Authentication dependencies
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
"""Get current authenticated user"""
return auth_manager.get_current_user(db, credentials)
def get_current_admin_user(current_user: User = Depends(get_current_user)):
"""Require admin user"""
return auth_manager.require_admin(current_user)
def get_user_shop(shop_code: str, current_user: User = Depends(get_current_user), db: Session = Depends(get_db)):
"""Get shop and verify user ownership (or admin)"""
shop = db.query(Shop).filter(Shop.shop_code == shop_code.upper()).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
# Admin can access any shop, owners can access their own shops
if current_user.role != "admin" and shop.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Access denied to this shop")
return shop
# Rate limiting decorator
def rate_limit(max_requests: int = 100, window_seconds: int = 3600):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Extract client IP or user ID for rate limiting
client_id = "anonymous" # In production, extract from request
if not rate_limiter.allow_request(client_id, max_requests, window_seconds):
raise HTTPException(
status_code=429,
detail="Rate limit exceeded"
)
return await func(*args, **kwargs)
return wrapper
return decorator
# Authentication Routes
@app.post("/register", response_model=UserResponse)
def register_user(user_data: UserRegister, db: Session = Depends(get_db)):
"""Register a new user"""
# Check if email already exists
existing_email = db.query(User).filter(User.email == user_data.email).first()
if existing_email:
raise HTTPException(status_code=400, detail="Email already registered")
# Check if username already exists
existing_username = db.query(User).filter(User.username == user_data.username).first()
if existing_username:
raise HTTPException(status_code=400, detail="Username already taken")
# Hash password and create user
hashed_password = auth_manager.hash_password(user_data.password)
new_user = User(
email=user_data.email,
username=user_data.username,
hashed_password=hashed_password,
role="user", # Default role
is_active=True
)
db.add(new_user)
db.commit()
db.refresh(new_user)
logger.info(f"New user registered: {new_user.username}")
return new_user
@app.post("/login", response_model=LoginResponse)
def login_user(user_credentials: UserLogin, db: Session = Depends(get_db)):
"""Login user and return JWT token"""
user = auth_manager.authenticate_user(db, user_credentials.username, user_credentials.password)
if not user:
raise HTTPException(
status_code=401,
detail="Incorrect username or password"
)
# Create access token
token_data = auth_manager.create_access_token(user)
logger.info(f"User logged in: {user.username}")
return LoginResponse(
access_token=token_data["access_token"],
token_type=token_data["token_type"],
expires_in=token_data["expires_in"],
user=UserResponse.model_validate(user)
)
@app.get("/me", response_model=UserResponse)
def get_current_user_info(current_user: User = Depends(get_current_user)):
"""Get current user information"""
return UserResponse.model_validate(current_user)
# Public Routes (no authentication required)
@app.get("/")
def root():
return {
"message": "Ecommerce Backend API v2.2 with Marketplace Support",
"status": "operational",
"features": [
"JWT Authentication",
"Marketplace-aware product import",
"Multi-shop product management",
"Stock management with location tracking"
],
"supported_marketplaces": ["Letzshop", "Amazon", "eBay", "Etsy", "Shopify", "Other"],
"auth_required": "Most endpoints require Bearer token authentication"
}
@app.get("/health")
def health_check(db: Session = Depends(get_db)):
"""Health check endpoint"""
try:
# Test database connection
db.execute(text("SELECT 1"))
return {"status": "healthy", "timestamp": datetime.utcnow()}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service unhealthy")
# Marketplace Import Routes (Protected)
@app.post("/import-from-marketplace", response_model=MarketplaceImportJobResponse)
@rate_limit(max_requests=10, window_seconds=3600) # Limit marketplace imports
async def import_products_from_marketplace(
request: MarketplaceImportRequest,
background_tasks: BackgroundTasks,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Import products from marketplace CSV with background processing (Protected)"""
logger.info(
f"Starting marketplace import: {request.marketplace} -> {request.shop_code} by user {current_user.username}")
# Verify shop exists and user has access
shop = db.query(Shop).filter(Shop.shop_code == request.shop_code).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
# Check permissions: admin can import for any shop, others only for their own
if current_user.role != "admin" and shop.owner_id != current_user.id:
raise HTTPException(status_code=403, detail="Access denied to this shop")
# Create marketplace import job record
import_job = MarketplaceImportJob(
status="pending",
source_url=request.url,
marketplace=request.marketplace,
shop_code=request.shop_code,
user_id=current_user.id,
created_at=datetime.utcnow()
)
db.add(import_job)
db.commit()
db.refresh(import_job)
# Process in background
background_tasks.add_task(
process_marketplace_import,
import_job.id,
request.url,
request.marketplace,
request.shop_code,
request.batch_size or 1000
)
return MarketplaceImportJobResponse(
job_id=import_job.id,
status="pending",
marketplace=request.marketplace,
shop_code=request.shop_code,
message=f"Marketplace import started from {request.marketplace}. Check status with /marketplace-import-status/{import_job.id}"
)
async def process_marketplace_import(job_id: int, url: str, marketplace: str, shop_name: str,
batch_size: int = 1000):
"""Background task to process marketplace CSV import with batching"""
db = SessionLocal()
try:
# Update job status
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
logger.error(f"Marketplace import job {job_id} not found")
return
job.status = "processing"
job.started_at = datetime.utcnow()
db.commit()
logger.info(f"Processing marketplace import: Job {job_id}, Marketplace: {marketplace}, Shop: {shop_name}")
# Process CSV with marketplace and shop information
result = await csv_processor.process_marketplace_csv_from_url(
url, marketplace, shop_name, batch_size, db
)
# Update job with results
job.status = "completed"
job.completed_at = datetime.utcnow()
job.imported_count = result["imported"]
job.updated_count = result["updated"]
job.error_count = result.get("errors", 0)
job.total_processed = result["total_processed"]
if result.get("errors", 0) > 0:
job.status = "completed_with_errors"
job.error_message = f"{result['errors']} rows had errors"
db.commit()
logger.info(
f"Marketplace import job {job_id} completed successfully - Imported: {result['imported']}, "
f"Updated: {result['updated']}")
except Exception as e:
logger.error(f"Marketplace import job {job_id} failed: {e}")
job.status = "failed"
job.completed_at = datetime.utcnow()
job.error_message = str(e)
db.commit()
finally:
db.close()
@app.get("/marketplace-import-status/{job_id}", response_model=MarketplaceImportJobResponse)
def get_marketplace_import_status(
job_id: int,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get status of marketplace import job (Protected)"""
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
if not job:
raise HTTPException(status_code=404, detail="Marketplace import job not found")
# Users can only see their own jobs, admins can see all
if current_user.role != "admin" and job.user_id != current_user.id:
raise HTTPException(status_code=403, detail="Access denied to this import job")
return MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
shop_name=job.shop_name,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at
)
@app.get("/marketplace-import-jobs", response_model=List[MarketplaceImportJobResponse])
def get_marketplace_import_jobs(
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=100),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get marketplace import jobs with filtering (Protected)"""
query = db.query(MarketplaceImportJob)
# Users can only see their own jobs, admins can see all
if current_user.role != "admin":
query = query.filter(MarketplaceImportJob.user_id == current_user.id)
# Apply filters
if marketplace:
query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%"))
# Order by creation date (newest first) and apply pagination
jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all()
return [
MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
shop_name=job.shop_name,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at
) for job in jobs
]
# Enhanced Product Routes with Marketplace Support
@app.get("/products", response_model=ProductListResponse)
def get_products(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
brand: Optional[str] = Query(None),
category: Optional[str] = Query(None),
availability: Optional[str] = Query(None),
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
search: Optional[str] = Query(None),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get products with advanced filtering including marketplace and shop (Protected)"""
query = db.query(Product)
# Apply filters
if brand:
query = query.filter(Product.brand.ilike(f"%{brand}%"))
if category:
query = query.filter(Product.google_product_category.ilike(f"%{category}%"))
if availability:
query = query.filter(Product.availability == availability)
if marketplace:
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
if search:
# Search in title, description, and marketplace
search_term = f"%{search}%"
query = query.filter(
(Product.title.ilike(search_term)) |
(Product.description.ilike(search_term)) |
(Product.marketplace.ilike(search_term)) |
(Product.shop_name.ilike(search_term))
)
# Get total count for pagination
total = query.count()
# Apply pagination
products = query.offset(skip).limit(limit).all()
return ProductListResponse(
products=products,
total=total,
skip=skip,
limit=limit
)
@app.post("/products", response_model=ProductResponse)
def create_product(
product: ProductCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new product with validation and marketplace support (Protected)"""
# Check if product_id already exists
existing = db.query(Product).filter(Product.product_id == product.product_id).first()
if existing:
raise HTTPException(status_code=400, detail="Product with this ID already exists")
# Process and validate GTIN if provided
if product.gtin:
normalized_gtin = gtin_processor.normalize(product.gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
product.gtin = normalized_gtin
# Process price if provided
if product.price:
parsed_price, currency = price_processor.parse_price_currency(product.price)
if parsed_price:
product.price = parsed_price
product.currency = currency
# Set default marketplace if not provided
if not product.marketplace:
product.marketplace = "Letzshop"
db_product = Product(**product.dict())
db.add(db_product)
db.commit()
db.refresh(db_product)
logger.info(
f"Created product {db_product.product_id} for marketplace {db_product.marketplace}, shop {db_product.shop_name}")
return db_product
@app.get("/products/{product_id}", response_model=ProductDetailResponse)
def get_product(product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get product with stock information (Protected)"""
product = db.query(Product).filter(Product.product_id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Get stock information if GTIN exists
stock_info = None
if product.gtin:
stock_entries = db.query(Stock).filter(Stock.gtin == product.gtin).all()
if stock_entries:
total_quantity = sum(entry.quantity for entry in stock_entries)
locations = [
StockLocationResponse(location=entry.location, quantity=entry.quantity)
for entry in stock_entries
]
stock_info = StockSummaryResponse(
gtin=product.gtin,
total_quantity=total_quantity,
locations=locations
)
return ProductDetailResponse(
product=product,
stock_info=stock_info
)
@app.put("/products/{product_id}", response_model=ProductResponse)
def update_product(
product_id: str,
product_update: ProductUpdate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Update product with validation and marketplace support (Protected)"""
product = db.query(Product).filter(Product.product_id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Update fields
update_data = product_update.dict(exclude_unset=True)
# Validate GTIN if being updated
if "gtin" in update_data and update_data["gtin"]:
normalized_gtin = gtin_processor.normalize(update_data["gtin"])
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
update_data["gtin"] = normalized_gtin
# Process price if being updated
if "price" in update_data and update_data["price"]:
parsed_price, currency = price_processor.parse_price_currency(update_data["price"])
if parsed_price:
update_data["price"] = parsed_price
update_data["currency"] = currency
for key, value in update_data.items():
setattr(product, key, value)
product.updated_at = datetime.utcnow()
db.commit()
db.refresh(product)
return product
@app.delete("/products/{product_id}")
def delete_product(
product_id: str,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Delete product and associated stock (Protected)"""
product = db.query(Product).filter(Product.product_id == product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found")
# Delete associated stock entries if GTIN exists
if product.gtin:
db.query(Stock).filter(Stock.gtin == product.gtin).delete()
db.delete(product)
db.commit()
return {"message": "Product and associated stock deleted successfully"}
# Stock Management Routes (Protected)
# Stock Management Routes
@app.post("/stock", response_model=StockResponse)
def set_stock(stock: StockCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)"""
# Normalize GTIN
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(stock.gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
# Check if stock entry already exists for this GTIN and location
existing_stock = db.query(Stock).filter(
Stock.gtin == normalized_gtin,
Stock.location == stock.location.strip().upper()
).first()
if existing_stock:
# Update existing stock (SET to exact quantity)
old_quantity = existing_stock.quantity
existing_stock.quantity = stock.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(f"Updated stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity}{stock.quantity}")
return existing_stock
else:
# Create new stock entry
new_stock = Stock(
gtin=normalized_gtin,
location=stock.location.strip().upper(),
quantity=stock.quantity
)
db.add(new_stock)
db.commit()
db.refresh(new_stock)
logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}")
return new_stock
@app.post("/stock/add", response_model=StockResponse)
def add_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)"""
# Normalize GTIN
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(stock.gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
# Check if stock entry already exists for this GTIN and location
existing_stock = db.query(Stock).filter(
Stock.gtin == normalized_gtin,
Stock.location == stock.location.strip().upper()
).first()
if existing_stock:
# Add to existing stock
old_quantity = existing_stock.quantity
existing_stock.quantity += stock.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(
f"Added stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} + {stock.quantity} = {existing_stock.quantity}")
return existing_stock
else:
# Create new stock entry with the quantity
new_stock = Stock(
gtin=normalized_gtin,
location=stock.location.strip().upper(),
quantity=stock.quantity
)
db.add(new_stock)
db.commit()
db.refresh(new_stock)
logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}")
return new_stock
@app.post("/stock/remove", response_model=StockResponse)
def remove_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Remove quantity from existing stock for a GTIN at a specific location"""
# Normalize GTIN
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(stock.gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
# Find existing stock entry
existing_stock = db.query(Stock).filter(
Stock.gtin == normalized_gtin,
Stock.location == stock.location.strip().upper()
).first()
if not existing_stock:
raise HTTPException(
status_code=404,
detail=f"No stock found for GTIN {normalized_gtin} at location {stock.location}"
)
# Check if we have enough stock to remove
if existing_stock.quantity < stock.quantity:
raise HTTPException(
status_code=400,
detail=f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock.quantity}"
)
# Remove from existing stock
old_quantity = existing_stock.quantity
existing_stock.quantity -= stock.quantity
existing_stock.updated_at = datetime.utcnow()
db.commit()
db.refresh(existing_stock)
logger.info(
f"Removed stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} - {stock.quantity} = {existing_stock.quantity}")
return existing_stock
@app.get("/stock/{gtin}", response_model=StockSummaryResponse)
def get_stock_by_gtin(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get all stock locations and total quantity for a specific GTIN"""
# Normalize GTIN
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
# Get all stock entries for this GTIN
stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
if not stock_entries:
raise HTTPException(status_code=404, detail=f"No stock found for GTIN: {gtin}")
# Calculate total quantity and build locations list
total_quantity = 0
locations = []
for entry in stock_entries:
total_quantity += entry.quantity
locations.append(StockLocationResponse(
location=entry.location,
quantity=entry.quantity
))
# Try to get product title for reference
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
product_title = product.title if product else None
return StockSummaryResponse(
gtin=normalized_gtin,
total_quantity=total_quantity,
locations=locations,
product_title=product_title
)
@app.get("/stock/{gtin}/total")
def get_total_stock(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get total quantity in stock for a specific GTIN"""
# Normalize GTIN
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(gtin)
if not normalized_gtin:
raise HTTPException(status_code=400, detail="Invalid GTIN format")
# Calculate total stock
total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
total_quantity = sum(entry.quantity for entry in total_stock)
# Get product info for context
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
return {
"gtin": normalized_gtin,
"total_quantity": total_quantity,
"product_title": product.title if product else None,
"locations_count": len(total_stock)
}
@app.get("/stock", response_model=List[StockResponse])
def get_all_stock(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
location: Optional[str] = Query(None, description="Filter by location"),
gtin: Optional[str] = Query(None, description="Filter by GTIN"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get all stock entries with optional filtering"""
query = db.query(Stock)
if location:
query = query.filter(Stock.location.ilike(f"%{location}%"))
if gtin:
# Normalize GTIN for search
def normalize_gtin(gtin_value):
if not gtin_value:
return None
gtin_str = str(gtin_value).strip()
if '.' in gtin_str:
gtin_str = gtin_str.split('.')[0]
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
if len(gtin_clean) in [8, 12, 13, 14]:
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
return gtin_clean if gtin_clean else None
normalized_gtin = normalize_gtin(gtin)
if normalized_gtin:
query = query.filter(Stock.gtin == normalized_gtin)
stock_entries = query.offset(skip).limit(limit).all()
return stock_entries
@app.put("/stock/{stock_id}", response_model=StockResponse)
def update_stock(stock_id: int, stock_update: StockUpdate, db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)):
"""Update stock quantity for a specific stock entry"""
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
if not stock_entry:
raise HTTPException(status_code=404, detail="Stock entry not found")
stock_entry.quantity = stock_update.quantity
stock_entry.updated_at = datetime.utcnow()
db.commit()
db.refresh(stock_entry)
return stock_entry
@app.delete("/stock/{stock_id}")
def delete_stock(stock_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Delete a stock entry"""
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
if not stock_entry:
raise HTTPException(status_code=404, detail="Stock entry not found")
db.delete(stock_entry)
db.commit()
return {"message": "Stock entry deleted successfully"}
# Shop Management Routes
@app.post("/shops", response_model=ShopResponse)
def create_shop(
shop_data: ShopCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Create a new shop (Protected)"""
# Check if shop code already exists
existing_shop = db.query(Shop).filter(Shop.shop_code == shop_data.shop_code).first()
if existing_shop:
raise HTTPException(status_code=400, detail="Shop code already exists")
# Create shop
new_shop = Shop(
**shop_data.dict(),
owner_id=current_user.id,
is_active=True,
is_verified=(current_user.role == "admin") # Auto-verify if admin creates shop
)
db.add(new_shop)
db.commit()
db.refresh(new_shop)
logger.info(f"New shop created: {new_shop.shop_code} by {current_user.username}")
return new_shop
@app.get("/shops", response_model=ShopListResponse)
def get_shops(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
active_only: bool = Query(True),
verified_only: bool = Query(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get shops with filtering (Protected)"""
query = db.query(Shop)
# Non-admin users can only see active and verified shops, plus their own
if current_user.role != "admin":
query = query.filter(
(Shop.is_active == True) &
((Shop.is_verified == True) | (Shop.owner_id == current_user.id))
)
else:
# Admin can apply filters
if active_only:
query = query.filter(Shop.is_active == True)
if verified_only:
query = query.filter(Shop.is_verified == True)
total = query.count()
shops = query.offset(skip).limit(limit).all()
return ShopListResponse(
shops=shops,
total=total,
skip=skip,
limit=limit
)
@app.get("/shops/{shop_code}", response_model=ShopResponse)
def get_shop(shop_code: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get shop details (Protected)"""
shop = db.query(Shop).filter(Shop.shop_code == shop_code.upper()).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
# Non-admin users can only see active verified shops or their own shops
if current_user.role != "admin":
if not shop.is_active or (not shop.is_verified and shop.owner_id != current_user.id):
raise HTTPException(status_code=404, detail="Shop not found")
return shop
# Shop Product Management
@app.post("/shops/{shop_code}/products", response_model=ShopProductResponse)
def add_product_to_shop(
shop_code: str,
shop_product: ShopProductCreate,
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Add existing product to shop catalog with shop-specific settings (Protected)"""
# Get and verify shop
shop = get_user_shop(shop_code, current_user, db)
# Check if product exists
product = db.query(Product).filter(Product.product_id == shop_product.product_id).first()
if not product:
raise HTTPException(status_code=404, detail="Product not found in marketplace catalog")
# Check if product already in shop
existing_shop_product = db.query(ShopProduct).filter(
ShopProduct.shop_id == shop.id,
ShopProduct.product_id == product.id
).first()
if existing_shop_product:
raise HTTPException(status_code=400, detail="Product already in shop catalog")
# Create shop-product association
new_shop_product = ShopProduct(
shop_id=shop.id,
product_id=product.id,
**shop_product.dict(exclude={'product_id'})
)
db.add(new_shop_product)
db.commit()
db.refresh(new_shop_product)
# Return with product details
response = ShopProductResponse.model_validate(new_shop_product)
response.product = product
return response
@app.get("/shops/{shop_code}/products")
def get_shop_products(
shop_code: str,
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
active_only: bool = Query(True),
featured_only: bool = Query(False),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Get products in shop catalog (Protected)"""
# Get shop (public can view active/verified shops)
shop = db.query(Shop).filter(Shop.shop_code == shop_code.upper()).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
# Non-owners can only see active verified shops
if current_user.role != "admin" and shop.owner_id != current_user.id:
if not shop.is_active or not shop.is_verified:
raise HTTPException(status_code=404, detail="Shop not found")
# Query shop products
query = db.query(ShopProduct).filter(ShopProduct.shop_id == shop.id)
if active_only:
query = query.filter(ShopProduct.is_active == True)
if featured_only:
query = query.filter(ShopProduct.is_featured == True)
total = query.count()
shop_products = query.offset(skip).limit(limit).all()
# Format response
products = []
for sp in shop_products:
product_response = ShopProductResponse.model_validate(sp)
product_response.product = sp.product
products.append(product_response)
return {
"products": products,
"total": total,
"skip": skip,
"limit": limit,
"shop": ShopResponse.model_validate(shop)
}
# Enhanced Statistics with Marketplace Support
@app.get("/stats", response_model=StatsResponse)
def get_stats(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get comprehensive statistics with marketplace data (Protected)"""
# Use more efficient queries with proper indexes
total_products = db.query(Product).count()
unique_brands = db.query(Product.brand).filter(
Product.brand.isnot(None),
Product.brand != ""
).distinct().count()
unique_categories = db.query(Product.google_product_category).filter(
Product.google_product_category.isnot(None),
Product.google_product_category != ""
).distinct().count()
# New marketplace statistics
unique_marketplaces = db.query(Product.marketplace).filter(
Product.marketplace.isnot(None),
Product.marketplace != ""
).distinct().count()
unique_shops = db.query(Product.shop_name).filter(
Product.shop_name.isnot(None),
Product.shop_name != ""
).distinct().count()
# Stock statistics
total_stock_entries = db.query(Stock).count()
total_inventory = db.query(func.sum(Stock.quantity)).scalar() or 0
return StatsResponse(
total_products=total_products,
unique_brands=unique_brands,
unique_categories=unique_categories,
unique_marketplaces=unique_marketplaces,
unique_shops=unique_shops,
total_stock_entries=total_stock_entries,
total_inventory_quantity=total_inventory
)
@app.get("/marketplace-stats", response_model=List[MarketplaceStatsResponse])
def get_marketplace_stats(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
"""Get statistics broken down by marketplace (Protected)"""
# Query to get stats per marketplace
marketplace_stats = db.query(
Product.marketplace,
func.count(Product.id).label('total_products'),
func.count(func.distinct(Product.shop_name)).label('unique_shops'),
func.count(func.distinct(Product.brand)).label('unique_brands')
).filter(
Product.marketplace.isnot(None)
).group_by(Product.marketplace).all()
return [
MarketplaceStatsResponse(
marketplace=stat.marketplace,
total_products=stat.total_products,
unique_shops=stat.unique_shops,
unique_brands=stat.unique_brands
) for stat in marketplace_stats
]
# Export with streaming for large datasets (Protected)
@app.get("/export-csv")
async def export_csv(
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_user)
):
"""Export products as CSV with streaming and marketplace filtering (Protected)"""
def generate_csv():
# Stream CSV generation for memory efficiency
yield "product_id,title,description,link,image_link,availability,price,currency,brand,gtin,marketplace,shop_name\n"
batch_size = 1000
offset = 0
while True:
query = db.query(Product)
# Apply marketplace filters
if marketplace:
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
products = query.offset(offset).limit(batch_size).all()
if not products:
break
for product in products:
# Create CSV row with marketplace fields
row = f'"{product.product_id}","{product.title or ""}","{product.description or ""}","{product.link or ""}","{product.image_link or ""}","{product.availability or ""}","{product.price or ""}","{product.currency or ""}","{product.brand or ""}","{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n'
yield row
offset += batch_size
filename = "products_export"
if marketplace:
filename += f"_{marketplace}"
if shop_name:
filename += f"_{shop_name}"
filename += ".csv"
return StreamingResponse(
generate_csv(),
media_type="text/csv",
headers={"Content-Disposition": f"attachment; filename={filename}"}
)
# Admin-only routes
@app.get("/admin/users", response_model=List[UserResponse])
def get_all_users(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Get all users (Admin only)"""
users = db.query(User).offset(skip).limit(limit).all()
return [UserResponse.model_validate(user) for user in users]
@app.put("/admin/users/{user_id}/status")
def toggle_user_status(
user_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Toggle user active status (Admin only)"""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.id == current_admin.id:
raise HTTPException(status_code=400, detail="Cannot deactivate your own account")
user.is_active = not user.is_active
user.updated_at = datetime.utcnow()
db.commit()
db.refresh(user)
status = "activated" if user.is_active else "deactivated"
return {"message": f"User {user.username} has been {status}"}
@app.get("/admin/shops", response_model=ShopListResponse)
def get_all_shops_admin(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Get all shops with admin view (Admin only)"""
total = db.query(Shop).count()
shops = db.query(Shop).offset(skip).limit(limit).all()
return ShopListResponse(
shops=shops,
total=total,
skip=skip,
limit=limit
)
@app.put("/admin/shops/{shop_id}/verify")
def verify_shop(
shop_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Verify/unverify shop (Admin only)"""
shop = db.query(Shop).filter(Shop.id == shop_id).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
shop.is_verified = not shop.is_verified
shop.updated_at = datetime.utcnow()
db.commit()
db.refresh(shop)
status = "verified" if shop.is_verified else "unverified"
return {"message": f"Shop {shop.shop_code} has been {status}"}
@app.put("/admin/shops/{shop_id}/status")
def toggle_shop_status(
shop_id: int,
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Toggle shop active status (Admin only)"""
shop = db.query(Shop).filter(Shop.id == shop_id).first()
if not shop:
raise HTTPException(status_code=404, detail="Shop not found")
shop.is_active = not shop.is_active
shop.updated_at = datetime.utcnow()
db.commit()
db.refresh(shop)
status = "activated" if shop.is_active else "deactivated"
return {"message": f"Shop {shop.shop_code} has been {status}"}
@app.get("/admin/marketplace-import-jobs", response_model=List[MarketplaceImportJobResponse])
def get_all_marketplace_import_jobs(
marketplace: Optional[str] = Query(None),
shop_name: Optional[str] = Query(None),
status: Optional[str] = Query(None),
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=100),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_user)
):
"""Get all marketplace import jobs (Admin only)"""
query = db.query(MarketplaceImportJob)
# Apply filters
if marketplace:
query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%"))
if shop_name:
query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%"))
if status:
query = query.filter(MarketplaceImportJob.status == status)
# Order by creation date and apply pagination
jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all()
return [
MarketplaceImportJobResponse(
job_id=job.id,
status=job.status,
marketplace=job.marketplace,
shop_name=job.shop_name,
imported=job.imported_count or 0,
updated=job.updated_count or 0,
total_processed=job.total_processed or 0,
error_count=job.error_count or 0,
error_message=job.error_message,
created_at=job.created_at,
started_at=job.started_at,
completed_at=job.completed_at
) for job in jobs
]
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host="0.0.0.0",
port=8000,
reload=True,
log_level="info"
)