1133 lines
40 KiB
Python
1133 lines
40 KiB
Python
from fastapi import FastAPI, HTTPException, Query, Depends, BackgroundTasks
|
|
from fastapi.responses import StreamingResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from pydantic import BaseModel, Field, validator
|
|
from typing import Optional, List, Dict, Any
|
|
from datetime import datetime, timedelta
|
|
from sqlalchemy import create_engine, Column, Integer, String, DateTime, text, ForeignKey, Index, func
|
|
from sqlalchemy.ext.declarative import declarative_base
|
|
from sqlalchemy.orm import sessionmaker, Session, relationship
|
|
from contextlib import asynccontextmanager
|
|
import pandas as pd
|
|
import requests
|
|
from io import StringIO, BytesIO
|
|
import logging
|
|
import asyncio
|
|
import time
|
|
from functools import wraps
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Import utility modules
|
|
from utils.data_processing import GTINProcessor, PriceProcessor
|
|
from utils.csv_processor import CSVProcessor
|
|
from utils.database import get_db_engine, get_session_local
|
|
from models.database_models import Base, Product, Stock, User, MarketplaceImportJob
|
|
from models.api_models import *
|
|
from middleware.rate_limiter import RateLimiter
|
|
from middleware.auth import AuthManager
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Initialize processors
|
|
gtin_processor = GTINProcessor()
|
|
price_processor = PriceProcessor()
|
|
csv_processor = CSVProcessor()
|
|
|
|
# Database setup
|
|
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/ecommerce")
|
|
engine = get_db_engine(DATABASE_URL)
|
|
SessionLocal = get_session_local(engine)
|
|
|
|
# Rate limiter and auth manager
|
|
rate_limiter = RateLimiter()
|
|
auth_manager = AuthManager()
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan events"""
|
|
# Startup
|
|
logger.info("Starting up ecommerce API with marketplace import support")
|
|
|
|
# Create tables
|
|
Base.metadata.create_all(bind=engine)
|
|
|
|
# Create default admin user
|
|
db = SessionLocal()
|
|
try:
|
|
auth_manager.create_default_admin_user(db)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create default admin user: {e}")
|
|
finally:
|
|
db.close()
|
|
|
|
# Add indexes
|
|
with engine.connect() as conn:
|
|
try:
|
|
# User indexes
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_email ON users(email)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_username ON users(username)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_role ON users(role)"))
|
|
|
|
# Product indexes (including new marketplace indexes)
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_gtin ON products(gtin)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_brand ON products(brand)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_category ON products(google_product_category)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_availability ON products(availability)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_marketplace ON products(marketplace)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_shop_name ON products(shop_name)"))
|
|
conn.execute(
|
|
text("CREATE INDEX IF NOT EXISTS idx_product_marketplace_shop ON products(marketplace, shop_name)"))
|
|
|
|
# Stock indexes
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_gtin_location ON stock(gtin, location)"))
|
|
conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_location ON stock(location)"))
|
|
|
|
# Marketplace import job indexes
|
|
conn.execute(text(
|
|
"CREATE INDEX IF NOT EXISTS idx_marketplace_import_marketplace ON marketplace_import_jobs(marketplace)"))
|
|
conn.execute(text(
|
|
"CREATE INDEX IF NOT EXISTS idx_marketplace_import_shop_name ON marketplace_import_jobs(shop_name)"))
|
|
conn.execute(
|
|
text("CREATE INDEX IF NOT EXISTS idx_marketplace_import_user_id ON marketplace_import_jobs(user_id)"))
|
|
|
|
conn.commit()
|
|
logger.info("Database indexes created successfully")
|
|
except Exception as e:
|
|
logger.warning(f"Index creation warning: {e}")
|
|
|
|
yield
|
|
|
|
# Shutdown
|
|
logger.info("Shutting down ecommerce API")
|
|
|
|
|
|
# FastAPI app with lifespan
|
|
app = FastAPI(
|
|
title="Ecommerce Backend API with Marketplace Support",
|
|
description="Advanced product management system with JWT authentication, marketplace-aware CSV import/export and stock management",
|
|
version="2.2.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Configure appropriately for production
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Security
|
|
security = HTTPBearer()
|
|
|
|
|
|
# Database dependency with connection pooling
|
|
def get_db():
|
|
db = SessionLocal()
|
|
try:
|
|
yield db
|
|
except Exception as e:
|
|
db.rollback()
|
|
logger.error(f"Database error: {e}")
|
|
raise
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
# Authentication dependencies
|
|
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)):
|
|
"""Get current authenticated user"""
|
|
return auth_manager.get_current_user(db, credentials)
|
|
|
|
|
|
def get_current_admin_user(current_user: User = Depends(get_current_user)):
|
|
"""Require admin user"""
|
|
return auth_manager.require_admin(current_user)
|
|
|
|
|
|
# Rate limiting decorator
|
|
def rate_limit(max_requests: int = 100, window_seconds: int = 3600):
|
|
def decorator(func):
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs):
|
|
# Extract client IP or user ID for rate limiting
|
|
client_id = "anonymous" # In production, extract from request
|
|
|
|
if not rate_limiter.allow_request(client_id, max_requests, window_seconds):
|
|
raise HTTPException(
|
|
status_code=429,
|
|
detail="Rate limit exceeded"
|
|
)
|
|
|
|
return await func(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorator
|
|
|
|
|
|
# Authentication Routes
|
|
@app.post("/register", response_model=UserResponse)
|
|
def register_user(user_data: UserRegister, db: Session = Depends(get_db)):
|
|
"""Register a new user"""
|
|
|
|
# Check if email already exists
|
|
existing_email = db.query(User).filter(User.email == user_data.email).first()
|
|
if existing_email:
|
|
raise HTTPException(status_code=400, detail="Email already registered")
|
|
|
|
# Check if username already exists
|
|
existing_username = db.query(User).filter(User.username == user_data.username).first()
|
|
if existing_username:
|
|
raise HTTPException(status_code=400, detail="Username already taken")
|
|
|
|
# Hash password and create user
|
|
hashed_password = auth_manager.hash_password(user_data.password)
|
|
new_user = User(
|
|
email=user_data.email,
|
|
username=user_data.username,
|
|
hashed_password=hashed_password,
|
|
role="user", # Default role
|
|
is_active=True
|
|
)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
logger.info(f"New user registered: {new_user.username}")
|
|
return new_user
|
|
|
|
|
|
@app.post("/login", response_model=LoginResponse)
|
|
def login_user(user_credentials: UserLogin, db: Session = Depends(get_db)):
|
|
"""Login user and return JWT token"""
|
|
|
|
user = auth_manager.authenticate_user(db, user_credentials.username, user_credentials.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=401,
|
|
detail="Incorrect username or password"
|
|
)
|
|
|
|
# Create access token
|
|
token_data = auth_manager.create_access_token(user)
|
|
|
|
logger.info(f"User logged in: {user.username}")
|
|
|
|
return LoginResponse(
|
|
access_token=token_data["access_token"],
|
|
token_type=token_data["token_type"],
|
|
expires_in=token_data["expires_in"],
|
|
user=UserResponse.model_validate(user)
|
|
)
|
|
|
|
|
|
@app.get("/me", response_model=UserResponse)
|
|
def get_current_user_info(current_user: User = Depends(get_current_user)):
|
|
"""Get current user information"""
|
|
return UserResponse.model_validate(current_user)
|
|
|
|
|
|
# Public Routes (no authentication required)
|
|
@app.get("/")
|
|
def root():
|
|
return {
|
|
"message": "Ecommerce Backend API v2.2 with Marketplace Support",
|
|
"status": "operational",
|
|
"features": [
|
|
"JWT Authentication",
|
|
"Marketplace-aware product import",
|
|
"Multi-shop product management",
|
|
"Stock management with location tracking"
|
|
],
|
|
"supported_marketplaces": ["Letzshop", "Amazon", "eBay", "Etsy", "Shopify", "Other"],
|
|
"auth_required": "Most endpoints require Bearer token authentication"
|
|
}
|
|
|
|
|
|
@app.get("/health")
|
|
def health_check(db: Session = Depends(get_db)):
|
|
"""Health check endpoint"""
|
|
try:
|
|
# Test database connection
|
|
db.execute(text("SELECT 1"))
|
|
return {"status": "healthy", "timestamp": datetime.utcnow()}
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
raise HTTPException(status_code=503, detail="Service unhealthy")
|
|
|
|
|
|
# Marketplace Import Routes (Protected)
|
|
@app.post("/import-from-marketplace", response_model=MarketplaceImportJobResponse)
|
|
@rate_limit(max_requests=10, window_seconds=3600) # Limit marketplace imports
|
|
async def import_products_from_marketplace(
|
|
request: MarketplaceImportRequest,
|
|
background_tasks: BackgroundTasks,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Import products from marketplace CSV with background processing (Protected)"""
|
|
|
|
logger.info(
|
|
f"Starting marketplace import: {request.marketplace} -> {request.shop_name} by user {current_user.username}")
|
|
|
|
# Create marketplace import job record
|
|
import_job = MarketplaceImportJob(
|
|
status="pending",
|
|
source_url=request.url,
|
|
marketplace=request.marketplace,
|
|
shop_name=request.shop_name,
|
|
user_id=current_user.id,
|
|
created_at=datetime.utcnow()
|
|
)
|
|
db.add(import_job)
|
|
db.commit()
|
|
db.refresh(import_job)
|
|
|
|
# Process in background
|
|
background_tasks.add_task(
|
|
process_marketplace_import,
|
|
import_job.id,
|
|
request.url,
|
|
request.marketplace,
|
|
request.shop_name,
|
|
request.batch_size or 1000
|
|
)
|
|
|
|
return MarketplaceImportJobResponse(
|
|
job_id=import_job.id,
|
|
status="pending",
|
|
marketplace=request.marketplace,
|
|
shop_name=request.shop_name,
|
|
message=f"Marketplace import started from {request.marketplace}. Check status with /marketplace-import-status/{import_job.id}"
|
|
)
|
|
|
|
|
|
async def process_marketplace_import(job_id: int, url: str, marketplace: str, shop_name: str, batch_size: int = 1000):
|
|
"""Background task to process marketplace CSV import with batching"""
|
|
db = SessionLocal()
|
|
|
|
try:
|
|
# Update job status
|
|
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
|
|
if not job:
|
|
logger.error(f"Marketplace import job {job_id} not found")
|
|
return
|
|
|
|
job.status = "processing"
|
|
job.started_at = datetime.utcnow()
|
|
db.commit()
|
|
|
|
logger.info(f"Processing marketplace import: Job {job_id}, Marketplace: {marketplace}, Shop: {shop_name}")
|
|
|
|
# Process CSV with marketplace and shop information
|
|
result = await csv_processor.process_marketplace_csv_from_url(
|
|
url, marketplace, shop_name, batch_size, db
|
|
)
|
|
|
|
# Update job with results
|
|
job.status = "completed"
|
|
job.completed_at = datetime.utcnow()
|
|
job.imported_count = result["imported"]
|
|
job.updated_count = result["updated"]
|
|
job.error_count = result.get("errors", 0)
|
|
job.total_processed = result["total_processed"]
|
|
|
|
if result.get("errors", 0) > 0:
|
|
job.status = "completed_with_errors"
|
|
job.error_message = f"{result['errors']} rows had errors"
|
|
|
|
db.commit()
|
|
logger.info(
|
|
f"Marketplace import job {job_id} completed successfully - Imported: {result['imported']}, Updated: {result['updated']}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Marketplace import job {job_id} failed: {e}")
|
|
job.status = "failed"
|
|
job.completed_at = datetime.utcnow()
|
|
job.error_message = str(e)
|
|
db.commit()
|
|
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
@app.get("/marketplace-import-status/{job_id}", response_model=MarketplaceImportJobResponse)
|
|
def get_marketplace_import_status(
|
|
job_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get status of marketplace import job (Protected)"""
|
|
job = db.query(MarketplaceImportJob).filter(MarketplaceImportJob.id == job_id).first()
|
|
if not job:
|
|
raise HTTPException(status_code=404, detail="Marketplace import job not found")
|
|
|
|
# Users can only see their own jobs, admins can see all
|
|
if current_user.role != "admin" and job.user_id != current_user.id:
|
|
raise HTTPException(status_code=403, detail="Access denied to this import job")
|
|
|
|
return MarketplaceImportJobResponse(
|
|
job_id=job.id,
|
|
status=job.status,
|
|
marketplace=job.marketplace,
|
|
shop_name=job.shop_name,
|
|
imported=job.imported_count or 0,
|
|
updated=job.updated_count or 0,
|
|
total_processed=job.total_processed or 0,
|
|
error_count=job.error_count or 0,
|
|
error_message=job.error_message,
|
|
created_at=job.created_at,
|
|
started_at=job.started_at,
|
|
completed_at=job.completed_at
|
|
)
|
|
|
|
|
|
@app.get("/marketplace-import-jobs", response_model=List[MarketplaceImportJobResponse])
|
|
def get_marketplace_import_jobs(
|
|
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
|
|
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(50, ge=1, le=100),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get marketplace import jobs with filtering (Protected)"""
|
|
|
|
query = db.query(MarketplaceImportJob)
|
|
|
|
# Users can only see their own jobs, admins can see all
|
|
if current_user.role != "admin":
|
|
query = query.filter(MarketplaceImportJob.user_id == current_user.id)
|
|
|
|
# Apply filters
|
|
if marketplace:
|
|
query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%"))
|
|
if shop_name:
|
|
query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%"))
|
|
|
|
# Order by creation date (newest first) and apply pagination
|
|
jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
return [
|
|
MarketplaceImportJobResponse(
|
|
job_id=job.id,
|
|
status=job.status,
|
|
marketplace=job.marketplace,
|
|
shop_name=job.shop_name,
|
|
imported=job.imported_count or 0,
|
|
updated=job.updated_count or 0,
|
|
total_processed=job.total_processed or 0,
|
|
error_count=job.error_count or 0,
|
|
error_message=job.error_message,
|
|
created_at=job.created_at,
|
|
started_at=job.started_at,
|
|
completed_at=job.completed_at
|
|
) for job in jobs
|
|
]
|
|
|
|
|
|
# Enhanced Product Routes with Marketplace Support
|
|
@app.get("/products", response_model=ProductListResponse)
|
|
def get_products(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
brand: Optional[str] = Query(None),
|
|
category: Optional[str] = Query(None),
|
|
availability: Optional[str] = Query(None),
|
|
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
|
|
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
|
|
search: Optional[str] = Query(None),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get products with advanced filtering including marketplace and shop (Protected)"""
|
|
|
|
query = db.query(Product)
|
|
|
|
# Apply filters
|
|
if brand:
|
|
query = query.filter(Product.brand.ilike(f"%{brand}%"))
|
|
if category:
|
|
query = query.filter(Product.google_product_category.ilike(f"%{category}%"))
|
|
if availability:
|
|
query = query.filter(Product.availability == availability)
|
|
if marketplace:
|
|
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
|
|
if shop_name:
|
|
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
|
|
if search:
|
|
# Search in title, description, and marketplace
|
|
search_term = f"%{search}%"
|
|
query = query.filter(
|
|
(Product.title.ilike(search_term)) |
|
|
(Product.description.ilike(search_term)) |
|
|
(Product.marketplace.ilike(search_term)) |
|
|
(Product.shop_name.ilike(search_term))
|
|
)
|
|
|
|
# Get total count for pagination
|
|
total = query.count()
|
|
|
|
# Apply pagination
|
|
products = query.offset(skip).limit(limit).all()
|
|
|
|
return ProductListResponse(
|
|
products=products,
|
|
total=total,
|
|
skip=skip,
|
|
limit=limit
|
|
)
|
|
|
|
|
|
@app.post("/products", response_model=ProductResponse)
|
|
def create_product(
|
|
product: ProductCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new product with validation and marketplace support (Protected)"""
|
|
|
|
# Check if product_id already exists
|
|
existing = db.query(Product).filter(Product.product_id == product.product_id).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Product with this ID already exists")
|
|
|
|
# Process and validate GTIN if provided
|
|
if product.gtin:
|
|
normalized_gtin = gtin_processor.normalize(product.gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
product.gtin = normalized_gtin
|
|
|
|
# Process price if provided
|
|
if product.price:
|
|
parsed_price, currency = price_processor.parse_price_currency(product.price)
|
|
if parsed_price:
|
|
product.price = parsed_price
|
|
product.currency = currency
|
|
|
|
# Set default marketplace if not provided
|
|
if not product.marketplace:
|
|
product.marketplace = "Letzshop"
|
|
|
|
db_product = Product(**product.dict())
|
|
db.add(db_product)
|
|
db.commit()
|
|
db.refresh(db_product)
|
|
|
|
logger.info(
|
|
f"Created product {db_product.product_id} for marketplace {db_product.marketplace}, shop {db_product.shop_name}")
|
|
return db_product
|
|
|
|
|
|
@app.get("/products/{product_id}", response_model=ProductDetailResponse)
|
|
def get_product(product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get product with stock information (Protected)"""
|
|
|
|
product = db.query(Product).filter(Product.product_id == product_id).first()
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
# Get stock information if GTIN exists
|
|
stock_info = None
|
|
if product.gtin:
|
|
stock_entries = db.query(Stock).filter(Stock.gtin == product.gtin).all()
|
|
if stock_entries:
|
|
total_quantity = sum(entry.quantity for entry in stock_entries)
|
|
locations = [
|
|
StockLocationResponse(location=entry.location, quantity=entry.quantity)
|
|
for entry in stock_entries
|
|
]
|
|
stock_info = StockSummaryResponse(
|
|
gtin=product.gtin,
|
|
total_quantity=total_quantity,
|
|
locations=locations
|
|
)
|
|
|
|
return ProductDetailResponse(
|
|
product=product,
|
|
stock_info=stock_info
|
|
)
|
|
|
|
|
|
@app.put("/products/{product_id}", response_model=ProductResponse)
|
|
def update_product(
|
|
product_id: str,
|
|
product_update: ProductUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Update product with validation and marketplace support (Protected)"""
|
|
|
|
product = db.query(Product).filter(Product.product_id == product_id).first()
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
# Update fields
|
|
update_data = product_update.dict(exclude_unset=True)
|
|
|
|
# Validate GTIN if being updated
|
|
if "gtin" in update_data and update_data["gtin"]:
|
|
normalized_gtin = gtin_processor.normalize(update_data["gtin"])
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
update_data["gtin"] = normalized_gtin
|
|
|
|
# Process price if being updated
|
|
if "price" in update_data and update_data["price"]:
|
|
parsed_price, currency = price_processor.parse_price_currency(update_data["price"])
|
|
if parsed_price:
|
|
update_data["price"] = parsed_price
|
|
update_data["currency"] = currency
|
|
|
|
for key, value in update_data.items():
|
|
setattr(product, key, value)
|
|
|
|
product.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(product)
|
|
|
|
return product
|
|
|
|
|
|
@app.delete("/products/{product_id}")
|
|
def delete_product(
|
|
product_id: str,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Delete product and associated stock (Protected)"""
|
|
|
|
product = db.query(Product).filter(Product.product_id == product_id).first()
|
|
if not product:
|
|
raise HTTPException(status_code=404, detail="Product not found")
|
|
|
|
# Delete associated stock entries if GTIN exists
|
|
if product.gtin:
|
|
db.query(Stock).filter(Stock.gtin == product.gtin).delete()
|
|
|
|
db.delete(product)
|
|
db.commit()
|
|
|
|
return {"message": "Product and associated stock deleted successfully"}
|
|
|
|
|
|
# Stock Management Routes (Protected)
|
|
# Stock Management Routes
|
|
|
|
@app.post("/stock", response_model=StockResponse)
|
|
def set_stock(stock: StockCreate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Set exact stock quantity for a GTIN at a specific location (replaces existing quantity)"""
|
|
|
|
# Normalize GTIN
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(stock.gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
|
|
# Check if stock entry already exists for this GTIN and location
|
|
existing_stock = db.query(Stock).filter(
|
|
Stock.gtin == normalized_gtin,
|
|
Stock.location == stock.location.strip().upper()
|
|
).first()
|
|
|
|
if existing_stock:
|
|
# Update existing stock (SET to exact quantity)
|
|
old_quantity = existing_stock.quantity
|
|
existing_stock.quantity = stock.quantity
|
|
existing_stock.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(existing_stock)
|
|
logger.info(f"Updated stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} → {stock.quantity}")
|
|
return existing_stock
|
|
else:
|
|
# Create new stock entry
|
|
new_stock = Stock(
|
|
gtin=normalized_gtin,
|
|
location=stock.location.strip().upper(),
|
|
quantity=stock.quantity
|
|
)
|
|
db.add(new_stock)
|
|
db.commit()
|
|
db.refresh(new_stock)
|
|
logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}")
|
|
return new_stock
|
|
|
|
|
|
@app.post("/stock/add", response_model=StockResponse)
|
|
def add_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Add quantity to existing stock for a GTIN at a specific location (adds to existing quantity)"""
|
|
|
|
# Normalize GTIN
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(stock.gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
|
|
# Check if stock entry already exists for this GTIN and location
|
|
existing_stock = db.query(Stock).filter(
|
|
Stock.gtin == normalized_gtin,
|
|
Stock.location == stock.location.strip().upper()
|
|
).first()
|
|
|
|
if existing_stock:
|
|
# Add to existing stock
|
|
old_quantity = existing_stock.quantity
|
|
existing_stock.quantity += stock.quantity
|
|
existing_stock.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(existing_stock)
|
|
logger.info(
|
|
f"Added stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} + {stock.quantity} = {existing_stock.quantity}")
|
|
return existing_stock
|
|
else:
|
|
# Create new stock entry with the quantity
|
|
new_stock = Stock(
|
|
gtin=normalized_gtin,
|
|
location=stock.location.strip().upper(),
|
|
quantity=stock.quantity
|
|
)
|
|
db.add(new_stock)
|
|
db.commit()
|
|
db.refresh(new_stock)
|
|
logger.info(f"Created new stock for GTIN {normalized_gtin} at {stock.location}: {stock.quantity}")
|
|
return new_stock
|
|
|
|
|
|
@app.post("/stock/remove", response_model=StockResponse)
|
|
def remove_stock(stock: StockAdd, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Remove quantity from existing stock for a GTIN at a specific location"""
|
|
|
|
# Normalize GTIN
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(stock.gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
|
|
# Find existing stock entry
|
|
existing_stock = db.query(Stock).filter(
|
|
Stock.gtin == normalized_gtin,
|
|
Stock.location == stock.location.strip().upper()
|
|
).first()
|
|
|
|
if not existing_stock:
|
|
raise HTTPException(
|
|
status_code=404,
|
|
detail=f"No stock found for GTIN {normalized_gtin} at location {stock.location}"
|
|
)
|
|
|
|
# Check if we have enough stock to remove
|
|
if existing_stock.quantity < stock.quantity:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Insufficient stock. Available: {existing_stock.quantity}, Requested to remove: {stock.quantity}"
|
|
)
|
|
|
|
# Remove from existing stock
|
|
old_quantity = existing_stock.quantity
|
|
existing_stock.quantity -= stock.quantity
|
|
existing_stock.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(existing_stock)
|
|
logger.info(
|
|
f"Removed stock for GTIN {normalized_gtin} at {stock.location}: {old_quantity} - {stock.quantity} = {existing_stock.quantity}")
|
|
return existing_stock
|
|
|
|
|
|
@app.get("/stock/{gtin}", response_model=StockSummaryResponse)
|
|
def get_stock_by_gtin(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get all stock locations and total quantity for a specific GTIN"""
|
|
|
|
# Normalize GTIN
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
|
|
# Get all stock entries for this GTIN
|
|
stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
|
|
|
|
if not stock_entries:
|
|
raise HTTPException(status_code=404, detail=f"No stock found for GTIN: {gtin}")
|
|
|
|
# Calculate total quantity and build locations list
|
|
total_quantity = 0
|
|
locations = []
|
|
|
|
for entry in stock_entries:
|
|
total_quantity += entry.quantity
|
|
locations.append(StockLocationResponse(
|
|
location=entry.location,
|
|
quantity=entry.quantity
|
|
))
|
|
|
|
# Try to get product title for reference
|
|
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
|
|
product_title = product.title if product else None
|
|
|
|
return StockSummaryResponse(
|
|
gtin=normalized_gtin,
|
|
total_quantity=total_quantity,
|
|
locations=locations,
|
|
product_title=product_title
|
|
)
|
|
|
|
|
|
@app.get("/stock/{gtin}/total")
|
|
def get_total_stock(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get total quantity in stock for a specific GTIN"""
|
|
|
|
# Normalize GTIN
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(gtin)
|
|
if not normalized_gtin:
|
|
raise HTTPException(status_code=400, detail="Invalid GTIN format")
|
|
|
|
# Calculate total stock
|
|
total_stock = db.query(Stock).filter(Stock.gtin == normalized_gtin).all()
|
|
total_quantity = sum(entry.quantity for entry in total_stock)
|
|
|
|
# Get product info for context
|
|
product = db.query(Product).filter(Product.gtin == normalized_gtin).first()
|
|
|
|
return {
|
|
"gtin": normalized_gtin,
|
|
"total_quantity": total_quantity,
|
|
"product_title": product.title if product else None,
|
|
"locations_count": len(total_stock)
|
|
}
|
|
|
|
|
|
@app.get("/stock", response_model=List[StockResponse])
|
|
def get_all_stock(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
location: Optional[str] = Query(None, description="Filter by location"),
|
|
gtin: Optional[str] = Query(None, description="Filter by GTIN"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Get all stock entries with optional filtering"""
|
|
query = db.query(Stock)
|
|
|
|
if location:
|
|
query = query.filter(Stock.location.ilike(f"%{location}%"))
|
|
|
|
if gtin:
|
|
# Normalize GTIN for search
|
|
def normalize_gtin(gtin_value):
|
|
if not gtin_value:
|
|
return None
|
|
gtin_str = str(gtin_value).strip()
|
|
if '.' in gtin_str:
|
|
gtin_str = gtin_str.split('.')[0]
|
|
gtin_clean = ''.join(filter(str.isdigit, gtin_str))
|
|
if len(gtin_clean) in [8, 12, 13, 14]:
|
|
return gtin_clean.zfill(13) if len(gtin_clean) == 13 else gtin_clean.zfill(12)
|
|
return gtin_clean if gtin_clean else None
|
|
|
|
normalized_gtin = normalize_gtin(gtin)
|
|
if normalized_gtin:
|
|
query = query.filter(Stock.gtin == normalized_gtin)
|
|
|
|
stock_entries = query.offset(skip).limit(limit).all()
|
|
return stock_entries
|
|
|
|
|
|
@app.put("/stock/{stock_id}", response_model=StockResponse)
|
|
def update_stock(stock_id: int, stock_update: StockUpdate, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Update stock quantity for a specific stock entry"""
|
|
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
|
|
if not stock_entry:
|
|
raise HTTPException(status_code=404, detail="Stock entry not found")
|
|
|
|
stock_entry.quantity = stock_update.quantity
|
|
stock_entry.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(stock_entry)
|
|
return stock_entry
|
|
|
|
|
|
@app.delete("/stock/{stock_id}")
|
|
def delete_stock(stock_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Delete a stock entry"""
|
|
stock_entry = db.query(Stock).filter(Stock.id == stock_id).first()
|
|
if not stock_entry:
|
|
raise HTTPException(status_code=404, detail="Stock entry not found")
|
|
|
|
db.delete(stock_entry)
|
|
db.commit()
|
|
return {"message": "Stock entry deleted successfully"}
|
|
|
|
# Enhanced Statistics with Marketplace Support
|
|
@app.get("/stats", response_model=StatsResponse)
|
|
def get_stats(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get comprehensive statistics with marketplace data (Protected)"""
|
|
|
|
# Use more efficient queries with proper indexes
|
|
total_products = db.query(Product).count()
|
|
|
|
unique_brands = db.query(Product.brand).filter(
|
|
Product.brand.isnot(None),
|
|
Product.brand != ""
|
|
).distinct().count()
|
|
|
|
unique_categories = db.query(Product.google_product_category).filter(
|
|
Product.google_product_category.isnot(None),
|
|
Product.google_product_category != ""
|
|
).distinct().count()
|
|
|
|
# New marketplace statistics
|
|
unique_marketplaces = db.query(Product.marketplace).filter(
|
|
Product.marketplace.isnot(None),
|
|
Product.marketplace != ""
|
|
).distinct().count()
|
|
|
|
unique_shops = db.query(Product.shop_name).filter(
|
|
Product.shop_name.isnot(None),
|
|
Product.shop_name != ""
|
|
).distinct().count()
|
|
|
|
# Stock statistics
|
|
total_stock_entries = db.query(Stock).count()
|
|
total_inventory = db.query(func.sum(Stock.quantity)).scalar() or 0
|
|
|
|
return StatsResponse(
|
|
total_products=total_products,
|
|
unique_brands=unique_brands,
|
|
unique_categories=unique_categories,
|
|
unique_marketplaces=unique_marketplaces,
|
|
unique_shops=unique_shops,
|
|
total_stock_entries=total_stock_entries,
|
|
total_inventory_quantity=total_inventory
|
|
)
|
|
|
|
|
|
@app.get("/marketplace-stats", response_model=List[MarketplaceStatsResponse])
|
|
def get_marketplace_stats(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)):
|
|
"""Get statistics broken down by marketplace (Protected)"""
|
|
|
|
# Query to get stats per marketplace
|
|
marketplace_stats = db.query(
|
|
Product.marketplace,
|
|
func.count(Product.id).label('total_products'),
|
|
func.count(func.distinct(Product.shop_name)).label('unique_shops'),
|
|
func.count(func.distinct(Product.brand)).label('unique_brands')
|
|
).filter(
|
|
Product.marketplace.isnot(None)
|
|
).group_by(Product.marketplace).all()
|
|
|
|
return [
|
|
MarketplaceStatsResponse(
|
|
marketplace=stat.marketplace,
|
|
total_products=stat.total_products,
|
|
unique_shops=stat.unique_shops,
|
|
unique_brands=stat.unique_brands
|
|
) for stat in marketplace_stats
|
|
]
|
|
|
|
|
|
# Export with streaming for large datasets (Protected)
|
|
@app.get("/export-csv")
|
|
async def export_csv(
|
|
marketplace: Optional[str] = Query(None, description="Filter by marketplace"),
|
|
shop_name: Optional[str] = Query(None, description="Filter by shop name"),
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Export products as CSV with streaming and marketplace filtering (Protected)"""
|
|
|
|
def generate_csv():
|
|
# Stream CSV generation for memory efficiency
|
|
yield "product_id,title,description,link,image_link,availability,price,currency,brand,gtin,marketplace,shop_name\n"
|
|
|
|
batch_size = 1000
|
|
offset = 0
|
|
|
|
while True:
|
|
query = db.query(Product)
|
|
|
|
# Apply marketplace filters
|
|
if marketplace:
|
|
query = query.filter(Product.marketplace.ilike(f"%{marketplace}%"))
|
|
if shop_name:
|
|
query = query.filter(Product.shop_name.ilike(f"%{shop_name}%"))
|
|
|
|
products = query.offset(offset).limit(batch_size).all()
|
|
if not products:
|
|
break
|
|
|
|
for product in products:
|
|
# Create CSV row with marketplace fields
|
|
row = f'"{product.product_id}","{product.title or ""}","{product.description or ""}","{product.link or ""}","{product.image_link or ""}","{product.availability or ""}","{product.price or ""}","{product.currency or ""}","{product.brand or ""}","{product.gtin or ""}","{product.marketplace or ""}","{product.shop_name or ""}"\n'
|
|
yield row
|
|
|
|
offset += batch_size
|
|
|
|
filename = "products_export"
|
|
if marketplace:
|
|
filename += f"_{marketplace}"
|
|
if shop_name:
|
|
filename += f"_{shop_name}"
|
|
filename += ".csv"
|
|
|
|
return StreamingResponse(
|
|
generate_csv(),
|
|
media_type="text/csv",
|
|
headers={"Content-Disposition": f"attachment; filename={filename}"}
|
|
)
|
|
|
|
|
|
# Admin-only routes
|
|
@app.get("/admin/users", response_model=List[UserResponse])
|
|
def get_all_users(
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=1000),
|
|
db: Session = Depends(get_db),
|
|
current_admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get all users (Admin only)"""
|
|
users = db.query(User).offset(skip).limit(limit).all()
|
|
return [UserResponse.model_validate(user) for user in users]
|
|
|
|
|
|
@app.put("/admin/users/{user_id}/status")
|
|
def toggle_user_status(
|
|
user_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Toggle user active status (Admin only)"""
|
|
user = db.query(User).filter(User.id == user_id).first()
|
|
if not user:
|
|
raise HTTPException(status_code=404, detail="User not found")
|
|
|
|
if user.id == current_admin.id:
|
|
raise HTTPException(status_code=400, detail="Cannot deactivate your own account")
|
|
|
|
user.is_active = not user.is_active
|
|
user.updated_at = datetime.utcnow()
|
|
db.commit()
|
|
db.refresh(user)
|
|
|
|
status = "activated" if user.is_active else "deactivated"
|
|
return {"message": f"User {user.username} has been {status}"}
|
|
|
|
|
|
@app.get("/admin/marketplace-import-jobs", response_model=List[MarketplaceImportJobResponse])
|
|
def get_all_marketplace_import_jobs(
|
|
marketplace: Optional[str] = Query(None),
|
|
shop_name: Optional[str] = Query(None),
|
|
status: Optional[str] = Query(None),
|
|
skip: int = Query(0, ge=0),
|
|
limit: int = Query(100, ge=1, le=100),
|
|
db: Session = Depends(get_db),
|
|
current_admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get all marketplace import jobs (Admin only)"""
|
|
|
|
query = db.query(MarketplaceImportJob)
|
|
|
|
# Apply filters
|
|
if marketplace:
|
|
query = query.filter(MarketplaceImportJob.marketplace.ilike(f"%{marketplace}%"))
|
|
if shop_name:
|
|
query = query.filter(MarketplaceImportJob.shop_name.ilike(f"%{shop_name}%"))
|
|
if status:
|
|
query = query.filter(MarketplaceImportJob.status == status)
|
|
|
|
# Order by creation date and apply pagination
|
|
jobs = query.order_by(MarketplaceImportJob.created_at.desc()).offset(skip).limit(limit).all()
|
|
|
|
return [
|
|
MarketplaceImportJobResponse(
|
|
job_id=job.id,
|
|
status=job.status,
|
|
marketplace=job.marketplace,
|
|
shop_name=job.shop_name,
|
|
imported=job.imported_count or 0,
|
|
updated=job.updated_count or 0,
|
|
total_processed=job.total_processed or 0,
|
|
error_count=job.error_count or 0,
|
|
error_message=job.error_message,
|
|
created_at=job.created_at,
|
|
started_at=job.started_at,
|
|
completed_at=job.completed_at
|
|
) for job in jobs
|
|
]
|
|
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
|
|
uvicorn.run(
|
|
"main:app",
|
|
host="0.0.0.0",
|
|
port=8000,
|
|
reload=True,
|
|
log_level="info"
|
|
) |