commit 9dd177bddc4b8b1d43a68b2ca3337989ae1f3c0d Author: Samir Boulahtit Date: Fri Sep 5 17:27:39 2025 +0200 Initial commit diff --git a/.env b/.env new file mode 100644 index 00000000..eaef3702 --- /dev/null +++ b/.env @@ -0,0 +1,24 @@ +# .env.example +# Database Configuration +# DATABASE_URL=postgresql://username:password@localhost:5432/ecommerce_db +# For development, you can use SQLite: +DATABASE_URL=sqlite:///./ecommerce.db + +# JWT Configuration +JWT_SECRET_KEY=your-super-secret-jwt-key-change-in-production +JWT_EXPIRE_HOURS=24 +JWT_EXPIRE_MINUTES=30 + +# API Configuration +API_HOST=0.0.0.0 +API_PORT=8000 +DEBUG=False + +# Rate Limiting +RATE_LIMIT_ENABLED=True +DEFAULT_RATE_LIMIT=100 +DEFAULT_WINDOW_SECONDS=3600 + +# Logging +LOG_LEVEL=INFO +LOG_FILE=app.log \ No newline at end of file diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..db98bb37 --- /dev/null +++ b/.env.example @@ -0,0 +1,23 @@ +# .env.example +# Database Configuration +DATABASE_URL=postgresql://username:password@localhost:5432/ecommerce_db +# For development, you can use SQLite: +# DATABASE_URL=sqlite:///./ecommerce.db + +# JWT Configuration +JWT_SECRET_KEY=your-super-secret-jwt-key-change-in-production +JWT_EXPIRE_HOURS=24 + +# API Configuration +API_HOST=0.0.0.0 +API_PORT=8000 +DEBUG=False + +# Rate Limiting +RATE_LIMIT_ENABLED=True +DEFAULT_RATE_LIMIT=100 +DEFAULT_WINDOW_SECONDS=3600 + +# Logging +LOG_LEVEL=INFO +LOG_FILE=app.log \ No newline at end of file diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 00000000..351c96db --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,4 @@ +# Default ignored files +/shelf/ +/workspace.xml + diff --git a/.idea/Letzshop-Import-v2.iml b/.idea/Letzshop-Import-v2.iml new file mode 100644 index 00000000..72ead01c --- /dev/null +++ b/.idea/Letzshop-Import-v2.iml @@ -0,0 +1,14 @@ + + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 00000000..930c0173 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,14 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 00000000..105ce2da --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 00000000..5082f5ca --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 00000000..37b2c337 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 00000000..a93b0f67 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,35 @@ +# Dockerfile +FROM python:3.11-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + libpq-dev \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Create logs directory +RUN mkdir -p logs + +# Create non-root user +RUN useradd -m -u 1000 appuser && chown -R appuser:appuser /app +USER appuser + +# Expose port +EXPOSE 8000 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD curl -f http://localhost:8000/health || exit 1 + +# Run the application +CMD ["python", "-m", "uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000"] diff --git a/Makefile b/Makefile new file mode 100644 index 00000000..ff729f34 --- /dev/null +++ b/Makefile @@ -0,0 +1,50 @@ +# Makefile +.PHONY: install dev test lint format docker-build docker-up docker-down migrate + +# Development setup +install: + pip install -r requirements.txt + +dev: + uvicorn main:app --reload --host 0.0.0.0 --port 8000 + +test: + pytest -v + +lint: + flake8 . --max-line-length=88 --extend-ignore=E203 + mypy . + +format: + black . + isort . + +# Database migrations +migrate-create: + alembic revision --autogenerate -m "$(message)" + +migrate-up: + alembic upgrade head + +migrate-down: + alembic downgrade -1 + +# Docker commands +docker-build: + docker-compose build + +docker-up: + docker-compose up -d + +docker-down: + docker-compose down + +docker-logs: + docker-compose logs -f api + +# Production deployment +deploy-staging: + docker-compose -f docker-compose.staging.yml up -d + +deploy-prod: + docker-compose -f docker-compose.prod.yml up -d diff --git a/README.md b/README.md new file mode 100644 index 00000000..d61b4111 --- /dev/null +++ b/README.md @@ -0,0 +1,500 @@ +# Ecommerce Backend API v2.0 + +A robust, production-ready FastAPI backend for ecommerce product catalog and inventory management with advanced CSV import capabilities. + +## Key Improvements from v1 + +### Architecture Improvements +- **Modular Design**: Separated concerns into utility modules, middleware, and models +- **Database Optimization**: Added proper indexing strategy and foreign key relationships +- **Connection Pooling**: PostgreSQL support with connection pooling for production scalability +- **Background Processing**: Asynchronous CSV import with job tracking + +### Security Enhancements +- **JWT Authentication**: Token-based authentication with role-based access control +- **Rate Limiting**: Sliding window rate limiter to prevent API abuse +- **Input Validation**: Enhanced Pydantic models with comprehensive validation + +### Performance Optimizations +- **Batch Processing**: CSV imports processed in configurable batches +- **Database Indexes**: Strategic indexing for common query patterns +- **Streaming Export**: Memory-efficient CSV export for large datasets +- **Caching Ready**: Architecture supports Redis integration + +### Data Processing +- **Robust GTIN Handling**: Centralized GTIN normalization and validation +- **Multi-currency Support**: Advanced price parsing with currency extraction +- **International Content**: Multi-encoding CSV support for global data + +## Project Structure + +``` +ecommerce_api/ +├── main.py # FastAPI application entry point +├── models/ +│ ├── database_models.py # SQLAlchemy ORM models +│ └── api_models.py # Pydantic API models +├── utils/ +│ ├── data_processing.py # GTIN and price processing utilities +│ ├── csv_processor.py # CSV import/export handling +│ └── database.py # Database configuration +├── middleware/ +│ ├── auth.py # JWT authentication +│ ├── rate_limiter.py # Rate limiting implementation +│ └── logging_middleware.py # Request/response logging +├── config/ +│ └── settings.py # Application configuration +├── tests/ +│ └── test_utils.py # Unit tests +├── alembic/ # Database migrations +├── docker-compose.yml # Docker deployment +├── Dockerfile # Container definition +├── requirements.txt # Python dependencies +└── README.md # This file +``` + +## Quick Start + +### 1. Development Setup + +```bash +# Clone the repository +git clone +cd ecommerce-api + +# Set up virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt + +# Set up environment variables +cp .env.example .env +# Edit .env with your database configuration +``` + +### 2. Database Setup + +**For SQLite (Development):** +```bash +# Your .env file should have: +# DATABASE_URL=sqlite:///./ecommerce.db + +# Initialize Alembic (only needed once) +alembic init alembic + +# Update alembic/env.py with the provided configuration (see below) + +# Create initial migration +alembic revision --autogenerate -m "Initial migration" + +# Apply migrations +alembic upgrade head +``` + +**For PostgreSQL (Production):** +```bash +# 1. Create PostgreSQL database +createdb ecommerce_db + +# 2. Update .env file: +# DATABASE_URL=postgresql://username:password@localhost:5432/ecommerce_db + +# 3. Initialize and run migrations +alembic init alembic +# Update alembic/env.py and alembic.ini (see configuration section) +alembic revision --autogenerate -m "Initial migration" +alembic upgrade head +``` + +**Important Alembic Configuration:** + +After running `alembic init alembic`, you must update two files: + +**1. Update `alembic/env.py`:** +```python +from logging.config import fileConfig +from sqlalchemy import engine_from_config, pool +from alembic import context +import os +import sys + +# Add your project directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from models.database_models import Base +from config.settings import settings + +# Alembic Config object +config = context.config + +# Override sqlalchemy.url with our settings +config.set_main_option("sqlalchemy.url", settings.database_url) + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() +``` + +**2. Update `alembic.ini` sqlalchemy.url line:** +```ini +# For SQLite: +sqlalchemy.url = sqlite:///./ecommerce.db + +# For PostgreSQL: +sqlalchemy.url = postgresql://username:password@localhost:5432/ecommerce_db +``` + +### 3. Configuration + +Edit `.env` file with your settings: + +```env +DATABASE_URL=postgresql://user:password@localhost:5432/ecommerce_db +JWT_SECRET_KEY=your-super-secret-key-change-in-production +API_HOST=0.0.0.0 +API_PORT=8000 +DEBUG=False +``` + +### 3. Run Development Server + +```bash +# Using make +make dev + +# Or directly +uvicorn main:app --reload --host 0.0.0.0 --port 8000 +``` + +### 4. Docker Deployment + +```bash +# Start all services +docker-compose up -d + +# View logs +docker-compose logs -f api + +# Stop services +docker-compose down +``` + +## API Endpoints + +### Authentication +- `POST /auth/login` - Get JWT token +- `POST /auth/refresh` - Refresh token + +### Products +- `GET /products` - List products with filtering and search +- `POST /products` - Create new product +- `GET /products/{product_id}` - Get product with stock info +- `PUT /products/{product_id}` - Update product +- `DELETE /products/{product_id}` - Delete product and associated stock + +### CSV Operations +- `POST /import-csv` - Start background CSV import +- `GET /import-status/{job_id}` - Check import job status +- `GET /export-csv` - Export products as CSV (streaming) + +### Stock Management +- `POST /stock` - Set exact stock quantity +- `POST /stock/add` - Add to existing stock +- `POST /stock/remove` - Remove from stock +- `GET /stock/{gtin}` - Get stock summary by GTIN +- `GET /stock/{gtin}/total` - Get total stock for GTIN +- `GET /stock` - List all stock entries with filtering +- `PUT /stock/{stock_id}` - Update stock entry +- `DELETE /stock/{stock_id}` - Delete stock entry + +### System +- `GET /` - API information +- `GET /health` - Health check +- `GET /stats` - System statistics + +## Advanced Features + +### Background CSV Import + +The API supports background processing of large CSV files: + +```python +# Start import +response = requests.post('/import-csv', json={ + 'url': 'https://example.com/products.csv', + 'batch_size': 1000 +}) + +job_id = response.json()['job_id'] + +# Check status +status = requests.get(f'/import-status/{job_id}') +``` + +### Rate Limiting + +Built-in rate limiting protects against API abuse: + +- Default: 100 requests per hour per client +- CSV imports: 10 per hour +- Configurable per endpoint + +### Search and Filtering + +Advanced product search capabilities: + +```bash +# Search in title and description +GET /products?search=laptop + +# Filter by brand and category +GET /products?brand=Apple&category=Electronics + +# Combine filters +GET /products?brand=Samsung&availability=in stock&search=phone +``` + +### Data Validation + +Comprehensive validation for all inputs: + +- GTIN format validation and normalization +- Price parsing with currency extraction +- Required field validation +- Type conversion and sanitization + +## Database Schema + +### Products Table +- Full product catalog with Google Shopping compatibility +- Indexed fields: `gtin`, `brand`, `google_product_category`, `availability` +- Timestamps for creation and updates + +### Stock Table +- Location-based inventory tracking +- GTIN-based product linking +- Unique constraint on GTIN+location combinations +- Composite indexes for efficient queries + +### Import Jobs Table +- Track background import operations +- Status monitoring and error handling +- Performance metrics + +## Development + +### Running Tests + +```bash +# All tests +make test + +# Specific test file +pytest tests/test_utils.py -v + +# With coverage +pytest --cov=. tests/ +``` + +### Code Quality + +```bash +# Format code +make format + +# Lint code +make lint + +# Type checking +mypy . +``` + +### Database Migrations + +**Creating Migrations:** +```bash +# After making changes to models/database_models.py, create a new migration +alembic revision --autogenerate -m "Description of changes" + +# Review the generated migration file in alembic/versions/ +# Edit if needed, then apply: +alembic upgrade head +``` + +**Common Migration Commands:** +```bash +# Check current migration status +alembic current + +# View migration history +alembic history + +# Upgrade to specific revision +alembic upgrade + +# Downgrade one step +alembic downgrade -1 + +# Downgrade to specific revision +alembic downgrade + +# Reset database (WARNING: destroys all data) +alembic downgrade base +alembic upgrade head +``` + +**Troubleshooting Alembic:** +```bash +# If you get template errors, reinitialize Alembic: +rm -rf alembic/ +alembic init alembic +# Then update alembic/env.py and alembic.ini as shown above + +# If migrations conflict, you may need to merge: +alembic merge -m "Merge migrations" +``` + +## Production Deployment + +### Environment Setup + +1. **Database**: PostgreSQL 13+ recommended +2. **Cache**: Redis for session storage and rate limiting +3. **Reverse Proxy**: Nginx for SSL termination and load balancing +4. **Monitoring**: Consider adding Prometheus metrics + +### Security Checklist + +- [ ] Change default JWT secret key +- [ ] Set up HTTPS/TLS +- [ ] Configure CORS appropriately +- [ ] Set up database connection limits +- [ ] Enable request logging +- [ ] Configure rate limiting per your needs +- [ ] Set up monitoring and alerting + +### Docker Production + +```yaml +# docker-compose.prod.yml +version: '3.8' +services: + api: + build: . + environment: + - DEBUG=False + - DATABASE_URL=${DATABASE_URL} + - JWT_SECRET_KEY=${JWT_SECRET_KEY} + restart: unless-stopped + # Add your production configuration +``` + +## Performance Considerations + +### Database Optimization +- Use PostgreSQL for production workloads +- Monitor query performance with `EXPLAIN ANALYZE` +- Consider read replicas for read-heavy workloads +- Regular `VACUUM` and `ANALYZE` operations + +### CSV Import Performance +- Batch size affects memory usage vs. speed +- Larger batches = faster import but more memory +- Monitor import job status for optimization + +### API Response Times +- Database indexes are crucial for filtering +- Use pagination for large result sets +- Consider caching frequently accessed data + +## Troubleshooting + +### Common Issues + +1. **CSV Import Failures** + - Check encoding: try different separators + - Validate required columns: `product_id`, `title` + - Monitor import job status for specific errors + +2. **Database Connection Issues** + - Verify DATABASE_URL format + - Check connection limits + - Ensure database server is accessible + +3. **Authentication Problems** + - Verify JWT_SECRET_KEY is set + - Check token expiration settings + - Validate token format + +### Logging + +Logs are structured and include: +- Request/response times +- Error details with stack traces +- Import job progress +- Rate limiting events + +```bash +# View live logs +tail -f logs/app.log + +# Docker logs +docker-compose logs -f api +``` + +## Contributing + +1. Fork the repository +2. Create a feature branch +3. Make changes with tests +4. Run quality checks: `make lint test` +5. Submit a pull request + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## Support + +For issues and questions: +1. Check the troubleshooting section +2. Review existing GitHub issues +3. Create a new issue with detailed information +4. For security issues, contact maintainers directly \ No newline at end of file diff --git a/TODO b/TODO new file mode 100644 index 00000000..2b7c04f8 --- /dev/null +++ b/TODO @@ -0,0 +1,15 @@ +Best Practice Recommendation +For production applications, consider using database migrations (like Alembic) to manage schema changes including index +creation, rather than creating them programmatically at startup. This provides better control and tracking of database changes. + + +https://letzshop-google-shopping-product-exports-production.s3.eu-west-1.amazonaws.com/city_zones/strassen/vendors/wizard-cloud-marketing-s-a-r-l/google_shopping/products_fr.csv + +what are those? +INFO: ('192.168.1.125', 53912) - "WebSocket /" 403 +INFO: connection rejected (403 Forbidden) +INFO: connection closed +INFO: 192.168.1.125:53913 - "GET /loginMsg.js HTTP/1.1" 404 Not Found +INFO: 192.168.1.125:53914 - "GET /cgi/get.cgi?cmd=home_login HTTP/1.1" 404 Not Found +INFO: 192.168.1.125:53915 - "POST /boaform/admin/formTracert HTTP/1.1" 404 Not Found + diff --git a/alembic-env.py b/alembic-env.py new file mode 100644 index 00000000..13180d78 --- /dev/null +++ b/alembic-env.py @@ -0,0 +1,56 @@ +from logging.config import fileConfig +from sqlalchemy import engine_from_config, pool +from alembic import context +import os +import sys + +# Add your project directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from models.database_models import Base +from config.settings import settings + +# Alembic Config object +config = context.config + +# Override sqlalchemy.url with our settings +config.set_main_option("sqlalchemy.url", settings.database_url) + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic.ini b/alembic.ini new file mode 100644 index 00000000..d7549228 --- /dev/null +++ b/alembic.ini @@ -0,0 +1,43 @@ +# alembic.ini +[alembic] +script_location = alembic +prepend_sys_path = . +version_path_separator = os +sqlalchemy.url = sqlite:///./ecommerce.db +# for PROD: sqlalchemy.url = postgresql://username:password@localhost:5432/ecommerce_db + +[post_write_hooks] + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARN +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARN +handlers = +qualname = sqlalchemy.engine + +[logger_alembic] +level = INFO +handlers = +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S diff --git a/alembic/README b/alembic/README new file mode 100644 index 00000000..98e4f9c4 --- /dev/null +++ b/alembic/README @@ -0,0 +1 @@ +Generic single-database configuration. \ No newline at end of file diff --git a/alembic/env.py b/alembic/env.py new file mode 100644 index 00000000..cec52eba --- /dev/null +++ b/alembic/env.py @@ -0,0 +1,59 @@ +from logging.config import fileConfig +from sqlalchemy import engine_from_config, pool +from alembic import context +import os +import sys + +# Add your project directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from models.database_models import Base +from config.settings import settings + +# Alembic Config object +config = context.config + +# Override sqlalchemy.url with our settings +config.set_main_option("sqlalchemy.url", settings.database_url) + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() diff --git a/alembic/script.py.mako b/alembic/script.py.mako new file mode 100644 index 00000000..fbc4b07d --- /dev/null +++ b/alembic/script.py.mako @@ -0,0 +1,26 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision: str = ${repr(up_revision)} +down_revision: Union[str, None] = ${repr(down_revision)} +branch_labels: Union[str, Sequence[str], None] = ${repr(branch_labels)} +depends_on: Union[str, Sequence[str], None] = ${repr(depends_on)} + + +def upgrade() -> None: + ${upgrades if upgrades else "pass"} + + +def downgrade() -> None: + ${downgrades if downgrades else "pass"} diff --git a/auth_example.py b/auth_example.py new file mode 100644 index 00000000..9358dafe --- /dev/null +++ b/auth_example.py @@ -0,0 +1,130 @@ +# Authentication Usage Example +# This file demonstrates how to use the authentication endpoints + +import requests +import json + +# API Base URL +BASE_URL = "http://localhost:8000" + +def register_user(email, username, password): + """Register a new user""" + response = requests.post(f"{BASE_URL}/register", json={ + "email": email, + "username": username, + "password": password + }) + return response.json() + +def login_user(username, password): + """Login and get JWT token""" + response = requests.post(f"{BASE_URL}/login", json={ + "username": username, + "password": password + }) + if response.status_code == 200: + data = response.json() + return data["access_token"] + else: + print(f"Login failed: {response.json()}") + return None + +def get_user_info(token): + """Get current user info""" + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(f"{BASE_URL}/me", headers=headers) + return response.json() + +def get_products(token, skip=0, limit=10): + """Get products (requires authentication)""" + headers = {"Authorization": f"Bearer {token}"} + response = requests.get(f"{BASE_URL}/products?skip={skip}&limit={limit}", headers=headers) + return response.json() + +def create_product(token, product_data): + """Create a new product (requires authentication)""" + headers = {"Authorization": f"Bearer {token}"} + response = requests.post(f"{BASE_URL}/products", json=product_data, headers=headers) + return response.json() + +# Example usage +if __name__ == "__main__": + # 1. Register a new user + print("1. Registering new user...") + try: + user_result = register_user("test@example.com", "testuser", "password123") + print(f"User registered: {user_result}") + except Exception as e: + print(f"Registration failed: {e}") + + # 2. Login with default admin user + print("\n2. Logging in as admin...") + admin_token = login_user("admin", "admin123") + if admin_token: + print(f"Admin login successful! Token: {admin_token[:50]}...") + + # 3. Get user info + print("\n3. Getting admin user info...") + user_info = get_user_info(admin_token) + print(f"User info: {user_info}") + + # 4. Create a sample product + print("\n4. Creating a sample product...") + sample_product = { + "product_id": "TEST001", + "title": "Test Product", + "description": "A test product for demonstration", + "price": "19.99", + "brand": "Test Brand", + "availability": "in stock" + } + + product_result = create_product(admin_token, sample_product) + print(f"Product created: {product_result}") + + # 5. Get products list + print("\n5. Getting products list...") + products = get_products(admin_token) + print(f"Products: {products}") + + # 6. Login with regular user + print("\n6. Logging in as regular user...") + user_token = login_user("testuser", "password123") + if user_token: + print(f"User login successful! Token: {user_token[:50]}...") + + # Regular users can also access protected endpoints + user_info = get_user_info(user_token) + print(f"Regular user info: {user_info}") + + products = get_products(user_token, limit=5) + print(f"Products accessible to regular user: {len(products.get('products', []))} products") + + print("\nAuthentication example completed!") + +# Example cURL commands: +""" +# Register a new user +curl -X POST "http://localhost:8000/register" \ + -H "Content-Type: application/json" \ + -d '{"email": "user@example.com", "username": "newuser", "password": "password123"}' + +# Login (get JWT token) +curl -X POST "http://localhost:8000/login" \ + -H "Content-Type: application/json" \ + -d '{"username": "admin", "password": "admin123"}' + +# Use token to access protected endpoint +curl -X GET "http://localhost:8000/me" \ + -H "Authorization: Bearer YOUR_JWT_TOKEN_HERE" + +# Get products (protected) +curl -X GET "http://localhost:8000/products" \ + -H "Authorization: Bearer YOUR_JWT_TOKEN_HERE" + +# Create product (protected) +curl -X POST "http://localhost:8000/products" \ + -H "Authorization: Bearer YOUR_JWT_TOKEN_HERE" \ + -H "Content-Type: application/json" \ + -d '{"product_id": "TEST001", "title": "Test Product", "price": "19.99"}' +""" \ No newline at end of file diff --git a/config/settings.py b/config/settings.py new file mode 100644 index 00000000..998df2db --- /dev/null +++ b/config/settings.py @@ -0,0 +1,31 @@ +# config/settings.py +from pydantic_settings import BaseSettings # This is the correct import for Pydantic v2 +from typing import Optional + + +class Settings(BaseSettings): + # Database + database_url: str = "sqlite:///./ecommerce.db" + + # JWT + jwt_secret_key: str = "change-this-in-production" + jwt_expire_hours: int = 24 + + # API + api_host: str = "0.0.0.0" + api_port: int = 8000 + debug: bool = False + + # Rate Limiting + rate_limit_enabled: bool = True + default_rate_limit: int = 100 + default_window_seconds: int = 3600 + + # Logging + log_level: str = "INFO" + log_file: Optional[str] = None + + model_config = {"env_file": ".env"} # Updated syntax for Pydantic v2 + + +settings = Settings() diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 00000000..f6ab4b43 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,57 @@ +# docker-compose.yml +version: '3.8' + +services: + db: + image: postgres:15 + restart: always + environment: + POSTGRES_DB: ecommerce_db + POSTGRES_USER: ecommerce_user + POSTGRES_PASSWORD: secure_password + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + ports: + - "5432:5432" + healthcheck: + test: ["CMD-SHELL", "pg_isready -U ecommerce_user -d ecommerce_db"] + interval: 30s + timeout: 10s + retries: 3 + + redis: + image: redis:7-alpine + restart: always + ports: + - "6379:6379" + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 30s + timeout: 10s + retries: 3 + + api: + build: . + restart: always + ports: + - "8000:8000" + environment: + DATABASE_URL: postgresql://ecommerce_user:secure_password@db:5432/ecommerce_db + JWT_SECRET_KEY: ${JWT_SECRET_KEY:-your-super-secret-key} + REDIS_URL: redis://redis:6379/0 + depends_on: + db: + condition: service_healthy + redis: + condition: service_healthy + volumes: + - ./logs:/app/logs + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8000/health"] + interval: 30s + timeout: 10s + retries: 3 + +volumes: + postgres_data: diff --git a/ecommerce.db b/ecommerce.db new file mode 100644 index 00000000..28d7063c Binary files /dev/null and b/ecommerce.db differ diff --git a/init.sql b/init.sql new file mode 100644 index 00000000..880a0d4e --- /dev/null +++ b/init.sql @@ -0,0 +1,10 @@ + + +# init.sql +-- Initial database setup +CREATE EXTENSION IF NOT EXISTS "uuid-ossp"; + +-- Create indexes for better performance +-- These will be managed by Alembic in production + + diff --git a/main.py b/main.py new file mode 100644 index 00000000..cbd0cc71 --- /dev/null +++ b/main.py @@ -0,0 +1,712 @@ +from fastapi import FastAPI, HTTPException, Query, Depends, BackgroundTasks +from fastapi.responses import StreamingResponse +from fastapi.middleware.cors import CORSMiddleware +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from pydantic import BaseModel, Field, validator +from typing import Optional, List, Dict, Any +from datetime import datetime, timedelta +from sqlalchemy import create_engine, Column, Integer, String, DateTime, text, ForeignKey, Index +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import sessionmaker, Session, relationship +from contextlib import asynccontextmanager +import pandas as pd +import requests +from io import StringIO, BytesIO +import logging +import asyncio +import time +from functools import wraps +import os +from dotenv import load_dotenv + +# Load environment variables +load_dotenv() + +# Import utility modules +from utils.data_processing import GTINProcessor, PriceProcessor +from utils.csv_processor import CSVProcessor +from utils.database import get_db_engine, get_session_local +from models.database_models import Base, Product, Stock, ImportJob, User +from models.api_models import * +from middleware.rate_limiter import RateLimiter +from middleware.auth import AuthManager + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + +# Initialize processors +gtin_processor = GTINProcessor() +price_processor = PriceProcessor() +csv_processor = CSVProcessor() + +# Database setup +DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost/ecommerce") +engine = get_db_engine(DATABASE_URL) +SessionLocal = get_session_local(engine) + +# Rate limiter and auth manager +rate_limiter = RateLimiter() +auth_manager = AuthManager() + + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan events""" + # Startup + logger.info("Starting up ecommerce API with authentication") + + # Create tables + Base.metadata.create_all(bind=engine) + + # Create default admin user + db = SessionLocal() + try: + auth_manager.create_default_admin_user(db) + except Exception as e: + logger.error(f"Failed to create default admin user: {e}") + finally: + db.close() + + # Add indexes + with engine.connect() as conn: + try: + # User indexes + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_email ON users(email)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_username ON users(username)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_user_role ON users(role)")) + + # Product indexes + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_gtin ON products(gtin)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_brand ON products(brand)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_category ON products(google_product_category)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_product_availability ON products(availability)")) + + # Stock indexes + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_gtin_location ON stock(gtin, location)")) + conn.execute(text("CREATE INDEX IF NOT EXISTS idx_stock_location ON stock(location)")) + + conn.commit() + logger.info("Database indexes created successfully") + except Exception as e: + logger.warning(f"Index creation warning: {e}") + + yield + + # Shutdown + logger.info("Shutting down ecommerce API") + + +# FastAPI app with lifespan +app = FastAPI( + title="Ecommerce Backend API", + description="Advanced product management system with JWT authentication, CSV import/export and stock management", + version="2.1.0", + lifespan=lifespan +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], # Configure appropriately for production + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Security +security = HTTPBearer() + + +# Database dependency with connection pooling +def get_db(): + db = SessionLocal() + try: + yield db + except Exception as e: + db.rollback() + logger.error(f"Database error: {e}") + raise + finally: + db.close() + + +# Authentication dependencies +def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security), db: Session = Depends(get_db)): + """Get current authenticated user""" + return auth_manager.get_current_user(db, credentials) + + +def get_current_admin_user(current_user: User = Depends(get_current_user)): + """Require admin user""" + return auth_manager.require_admin(current_user) + + +# Rate limiting decorator +def rate_limit(max_requests: int = 100, window_seconds: int = 3600): + def decorator(func): + @wraps(func) + async def wrapper(*args, **kwargs): + # Extract client IP or user ID for rate limiting + client_id = "anonymous" # In production, extract from request + + if not rate_limiter.allow_request(client_id, max_requests, window_seconds): + raise HTTPException( + status_code=429, + detail="Rate limit exceeded" + ) + + return await func(*args, **kwargs) + + return wrapper + + return decorator + + +# Authentication Routes +@app.post("/register", response_model=UserResponse) +def register_user(user_data: UserRegister, db: Session = Depends(get_db)): + """Register a new user""" + + # Check if email already exists + existing_email = db.query(User).filter(User.email == user_data.email).first() + if existing_email: + raise HTTPException(status_code=400, detail="Email already registered") + + # Check if username already exists + existing_username = db.query(User).filter(User.username == user_data.username).first() + if existing_username: + raise HTTPException(status_code=400, detail="Username already taken") + + # Hash password and create user + hashed_password = auth_manager.hash_password(user_data.password) + new_user = User( + email=user_data.email, + username=user_data.username, + hashed_password=hashed_password, + role="user", # Default role + is_active=True + ) + + db.add(new_user) + db.commit() + db.refresh(new_user) + + logger.info(f"New user registered: {new_user.username}") + return new_user + + +@app.post("/login", response_model=LoginResponse) +def login_user(user_credentials: UserLogin, db: Session = Depends(get_db)): + """Login user and return JWT token""" + + user = auth_manager.authenticate_user(db, user_credentials.username, user_credentials.password) + if not user: + raise HTTPException( + status_code=401, + detail="Incorrect username or password" + ) + + # Create access token + token_data = auth_manager.create_access_token(user) + + logger.info(f"User logged in: {user.username}") + + return LoginResponse( + access_token=token_data["access_token"], + token_type=token_data["token_type"], + expires_in=token_data["expires_in"], + user=UserResponse.model_validate(user) + ) + + +@app.get("/me", response_model=UserResponse) +def get_current_user_info(current_user: User = Depends(get_current_user)): + """Get current user information""" + return UserResponse.model_validate(current_user) + + +# Public Routes (no authentication required) +@app.get("/") +def root(): + return { + "message": "Ecommerce Backend API v2.1 with JWT Authentication", + "status": "operational", + "auth_required": "Most endpoints require Bearer token authentication" + } + + +@app.get("/health") +def health_check(db: Session = Depends(get_db)): + """Health check endpoint""" + try: + # Test database connection + db.execute(text("SELECT 1")) + return {"status": "healthy", "timestamp": datetime.utcnow()} + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Service unhealthy") + + +# Protected Routes (authentication required) +@app.post("/import-csv", response_model=ImportJobResponse) +@rate_limit(max_requests=10, window_seconds=3600) # Limit CSV imports +async def import_csv_from_url( + request: CSVImportRequest, + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Import products from CSV URL with background processing (Protected)""" + + # Create import job record + import_job = ImportJob( + status="pending", + source_url=request.url, + user_id=current_user.id, + created_at=datetime.utcnow() + ) + db.add(import_job) + db.commit() + db.refresh(import_job) + + # Process in background + background_tasks.add_task( + process_csv_import, + import_job.id, + request.url, + request.batch_size or 1000 + ) + + return ImportJobResponse( + job_id=import_job.id, + status="pending", + message="CSV import started. Check status with /import-status/{job_id}" + ) + + +async def process_csv_import(job_id: int, url: str, batch_size: int = 1000): + """Background task to process CSV import with batching""" + db = SessionLocal() + + try: + # Update job status + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + logger.error(f"Import job {job_id} not found") + return + + job.status = "processing" + job.started_at = datetime.utcnow() + db.commit() + + # Process CSV + result = await csv_processor.process_csv_from_url(url, batch_size, db) + + # Update job with results + job.status = "completed" + job.completed_at = datetime.utcnow() + job.imported_count = result["imported"] + job.updated_count = result["updated"] + job.error_count = result.get("errors", 0) + job.total_processed = result["total_processed"] + + if result.get("errors", 0) > 0: + job.status = "completed_with_errors" + job.error_message = f"{result['errors']} rows had errors" + + db.commit() + logger.info(f"Import job {job_id} completed successfully") + + except Exception as e: + logger.error(f"Import job {job_id} failed: {e}") + job.status = "failed" + job.completed_at = datetime.utcnow() + job.error_message = str(e) + db.commit() + + finally: + db.close() + + +@app.get("/import-status/{job_id}", response_model=ImportJobResponse) +def get_import_status(job_id: int, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): + """Get status of CSV import job (Protected)""" + job = db.query(ImportJob).filter(ImportJob.id == job_id).first() + if not job: + raise HTTPException(status_code=404, detail="Import job not found") + + # Users can only see their own jobs, admins can see all + if current_user.role != "admin" and job.user_id != current_user.id: + raise HTTPException(status_code=403, detail="Access denied to this import job") + + return ImportJobResponse( + job_id=job.id, + status=job.status, + imported=job.imported_count or 0, + updated=job.updated_count or 0, + total_processed=job.total_processed or 0, + error_count=job.error_count or 0, + error_message=job.error_message, + created_at=job.created_at, + started_at=job.started_at, + completed_at=job.completed_at + ) + + +@app.get("/products", response_model=ProductListResponse) +def get_products( + skip: int = Query(0, ge=0), + limit: int = Query(100, ge=1, le=1000), + brand: Optional[str] = Query(None), + category: Optional[str] = Query(None), + availability: Optional[str] = Query(None), + search: Optional[str] = Query(None), + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Get products with advanced filtering and search (Protected)""" + + query = db.query(Product) + + # Apply filters + if brand: + query = query.filter(Product.brand.ilike(f"%{brand}%")) + if category: + query = query.filter(Product.google_product_category.ilike(f"%{category}%")) + if availability: + query = query.filter(Product.availability == availability) + if search: + # Search in title and description + search_term = f"%{search}%" + query = query.filter( + (Product.title.ilike(search_term)) | + (Product.description.ilike(search_term)) + ) + + # Get total count for pagination + total = query.count() + + # Apply pagination + products = query.offset(skip).limit(limit).all() + + return ProductListResponse( + products=products, + total=total, + skip=skip, + limit=limit + ) + + +@app.post("/products", response_model=ProductResponse) +def create_product( + product: ProductCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Create a new product with validation (Protected)""" + + # Check if product_id already exists + existing = db.query(Product).filter(Product.product_id == product.product_id).first() + if existing: + raise HTTPException(status_code=400, detail="Product with this ID already exists") + + # Process and validate GTIN if provided + if product.gtin: + normalized_gtin = gtin_processor.normalize(product.gtin) + if not normalized_gtin: + raise HTTPException(status_code=400, detail="Invalid GTIN format") + product.gtin = normalized_gtin + + # Process price if provided + if product.price: + parsed_price, currency = price_processor.parse_price_currency(product.price) + if parsed_price: + product.price = parsed_price + product.currency = currency + + db_product = Product(**product.dict()) + db.add(db_product) + db.commit() + db.refresh(db_product) + + return db_product + + +@app.get("/products/{product_id}", response_model=ProductDetailResponse) +def get_product(product_id: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): + """Get product with stock information (Protected)""" + + product = db.query(Product).filter(Product.product_id == product_id).first() + if not product: + raise HTTPException(status_code=404, detail="Product not found") + + # Get stock information if GTIN exists + stock_info = None + if product.gtin: + stock_entries = db.query(Stock).filter(Stock.gtin == product.gtin).all() + if stock_entries: + total_quantity = sum(entry.quantity for entry in stock_entries) + locations = [ + StockLocationResponse(location=entry.location, quantity=entry.quantity) + for entry in stock_entries + ] + stock_info = StockSummaryResponse( + gtin=product.gtin, + total_quantity=total_quantity, + locations=locations + ) + + return ProductDetailResponse( + product=product, + stock_info=stock_info + ) + + +@app.put("/products/{product_id}", response_model=ProductResponse) +def update_product( + product_id: str, + product_update: ProductUpdate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Update product with validation (Protected)""" + + product = db.query(Product).filter(Product.product_id == product_id).first() + if not product: + raise HTTPException(status_code=404, detail="Product not found") + + # Update fields + update_data = product_update.dict(exclude_unset=True) + + # Validate GTIN if being updated + if "gtin" in update_data and update_data["gtin"]: + normalized_gtin = gtin_processor.normalize(update_data["gtin"]) + if not normalized_gtin: + raise HTTPException(status_code=400, detail="Invalid GTIN format") + update_data["gtin"] = normalized_gtin + + # Process price if being updated + if "price" in update_data and update_data["price"]: + parsed_price, currency = price_processor.parse_price_currency(update_data["price"]) + if parsed_price: + update_data["price"] = parsed_price + update_data["currency"] = currency + + for key, value in update_data.items(): + setattr(product, key, value) + + product.updated_at = datetime.utcnow() + db.commit() + db.refresh(product) + + return product + + +@app.delete("/products/{product_id}") +def delete_product( + product_id: str, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Delete product and associated stock (Protected)""" + + product = db.query(Product).filter(Product.product_id == product_id).first() + if not product: + raise HTTPException(status_code=404, detail="Product not found") + + # Delete associated stock entries if GTIN exists + if product.gtin: + db.query(Stock).filter(Stock.gtin == product.gtin).delete() + + db.delete(product) + db.commit() + + return {"message": "Product and associated stock deleted successfully"} + + +# Stock Management Routes (Protected) +@app.post("/stock", response_model=StockResponse) +def set_stock( + stock: StockCreate, + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Set stock with GTIN validation (Protected)""" + + # Normalize and validate GTIN + normalized_gtin = gtin_processor.normalize(stock.gtin) + if not normalized_gtin: + raise HTTPException(status_code=400, detail="Invalid GTIN format") + + # Verify GTIN exists in products + product = db.query(Product).filter(Product.gtin == normalized_gtin).first() + if not product: + logger.warning(f"Setting stock for GTIN {normalized_gtin} without corresponding product") + + # Check existing stock + existing_stock = db.query(Stock).filter( + Stock.gtin == normalized_gtin, + Stock.location == stock.location.strip().upper() + ).first() + + if existing_stock: + existing_stock.quantity = stock.quantity + existing_stock.updated_at = datetime.utcnow() + db.commit() + db.refresh(existing_stock) + return existing_stock + else: + new_stock = Stock( + gtin=normalized_gtin, + location=stock.location.strip().upper(), + quantity=stock.quantity + ) + db.add(new_stock) + db.commit() + db.refresh(new_stock) + return new_stock + + +@app.get("/stock/{gtin}", response_model=StockSummaryResponse) +def get_stock_by_gtin(gtin: str, db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): + """Get stock summary with product validation (Protected)""" + + normalized_gtin = gtin_processor.normalize(gtin) + if not normalized_gtin: + raise HTTPException(status_code=400, detail="Invalid GTIN format") + + stock_entries = db.query(Stock).filter(Stock.gtin == normalized_gtin).all() + if not stock_entries: + raise HTTPException(status_code=404, detail=f"No stock found for GTIN: {gtin}") + + total_quantity = sum(entry.quantity for entry in stock_entries) + locations = [ + StockLocationResponse(location=entry.location, quantity=entry.quantity) + for entry in stock_entries + ] + + # Get product info + product = db.query(Product).filter(Product.gtin == normalized_gtin).first() + + return StockSummaryResponse( + gtin=normalized_gtin, + total_quantity=total_quantity, + locations=locations, + product_title=product.title if product else None + ) + + +@app.get("/stats", response_model=StatsResponse) +def get_stats(db: Session = Depends(get_db), current_user: User = Depends(get_current_user)): + """Get comprehensive statistics (Protected)""" + + # Use more efficient queries with proper indexes + total_products = db.query(Product).count() + + unique_brands = db.query(Product.brand).filter( + Product.brand.isnot(None), + Product.brand != "" + ).distinct().count() + + unique_categories = db.query(Product.google_product_category).filter( + Product.google_product_category.isnot(None), + Product.google_product_category != "" + ).distinct().count() + + # Additional stock statistics + total_stock_entries = db.query(Stock).count() + total_inventory = db.query(Stock.quantity).scalar() or 0 + + return StatsResponse( + total_products=total_products, + unique_brands=unique_brands, + unique_categories=unique_categories, + total_stock_entries=total_stock_entries, + total_inventory_quantity=total_inventory + ) + + +# Export with streaming for large datasets (Protected) +@app.get("/export-csv") +async def export_csv( + db: Session = Depends(get_db), + current_user: User = Depends(get_current_user) +): + """Export products as CSV with streaming (Protected)""" + + def generate_csv(): + # Stream CSV generation for memory efficiency + yield "product_id,title,description,link,image_link,availability,price,currency,brand,gtin\n" + + batch_size = 1000 + offset = 0 + + while True: + products = db.query(Product).offset(offset).limit(batch_size).all() + if not products: + break + + for product in products: + # Create CSV row + row = f'"{product.product_id}","{product.title or ""}","{product.description or ""}","{product.link or ""}","{product.image_link or ""}","{product.availability or ""}","{product.price or ""}","{product.currency or ""}","{product.brand or ""}","{product.gtin or ""}"\n' + yield row + + offset += batch_size + + return StreamingResponse( + generate_csv(), + media_type="text/csv", + headers={"Content-Disposition": "attachment; filename=products_export.csv"} + ) + + +# Admin-only routes +@app.get("/admin/users", response_model=List[UserResponse]) +def get_all_users( + skip: int = Query(0, ge=0), + limit: int = Query(100, ge=1, le=1000), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user) +): + """Get all users (Admin only)""" + users = db.query(User).offset(skip).limit(limit).all() + return [UserResponse.model_validate(user) for user in users] + + +@app.put("/admin/users/{user_id}/status") +def toggle_user_status( + user_id: int, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_user) +): + """Toggle user active status (Admin only)""" + user = db.query(User).filter(User.id == user_id).first() + if not user: + raise HTTPException(status_code=404, detail="User not found") + + if user.id == current_admin.id: + raise HTTPException(status_code=400, detail="Cannot deactivate your own account") + + user.is_active = not user.is_active + user.updated_at = datetime.utcnow() + db.commit() + db.refresh(user) + + status = "activated" if user.is_active else "deactivated" + return {"message": f"User {user.username} has been {status}"} + + +if __name__ == "__main__": + import uvicorn + + uvicorn.run( + "main:app", + host="0.0.0.0", + port=8000, + reload=True, + log_level="info" + ) diff --git a/middleware/auth.py b/middleware/auth.py new file mode 100644 index 00000000..10a8a6fd --- /dev/null +++ b/middleware/auth.py @@ -0,0 +1,172 @@ +# middleware/auth.py +from fastapi import HTTPException, Depends +from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials +from passlib.context import CryptContext +from jose import jwt, JWTError +from datetime import datetime, timedelta +from typing import Dict, Any, Optional +from sqlalchemy.orm import Session +from models.database_models import User +import os +import logging + +logger = logging.getLogger(__name__) + +# Password context for bcrypt hashing +pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") + +# Security scheme +security = HTTPBearer() + + +class AuthManager: + """JWT-based authentication manager with bcrypt password hashing""" + + def __init__(self): + self.secret_key = os.getenv("JWT_SECRET_KEY", "your-secret-key-change-in-production-please") + self.algorithm = "HS256" + self.token_expire_minutes = int(os.getenv("JWT_EXPIRE_MINUTES", "30")) + + def hash_password(self, password: str) -> str: + """Hash password using bcrypt""" + return pwd_context.hash(password) + + def verify_password(self, plain_password: str, hashed_password: str) -> bool: + """Verify password against hash""" + return pwd_context.verify(plain_password, hashed_password) + + def authenticate_user(self, db: Session, username: str, password: str) -> Optional[User]: + """Authenticate user and return user object if valid""" + user = db.query(User).filter( + (User.username == username) | (User.email == username) + ).first() + + if not user: + return None + + if not user.is_active: + return None + + if not self.verify_password(password, user.hashed_password): + return None + + # Update last login + user.last_login = datetime.utcnow() + db.commit() + db.refresh(user) + + return user + + def create_access_token(self, user: User) -> Dict[str, Any]: + """Create JWT access token for user""" + expires_delta = timedelta(minutes=self.token_expire_minutes) + expire = datetime.utcnow() + expires_delta + + payload = { + "sub": str(user.id), + "username": user.username, + "email": user.email, + "role": user.role, + "exp": expire, + "iat": datetime.utcnow() + } + + token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm) + + return { + "access_token": token, + "token_type": "bearer", + "expires_in": self.token_expire_minutes * 60 # Return in seconds + } + + def verify_token(self, token: str) -> Dict[str, Any]: + """Verify JWT token and return user data""" + try: + payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm]) + + # Check if token has expired + exp = payload.get("exp") + if exp is None: + raise HTTPException(status_code=401, detail="Token missing expiration") + + if datetime.utcnow() > datetime.fromtimestamp(exp): + raise HTTPException(status_code=401, detail="Token has expired") + + # Extract user data + user_id = payload.get("sub") + if user_id is None: + raise HTTPException(status_code=401, detail="Token missing user identifier") + + return { + "user_id": int(user_id), + "username": payload.get("username"), + "email": payload.get("email"), + "role": payload.get("role", "user") + } + + except jwt.ExpiredSignatureError: + raise HTTPException(status_code=401, detail="Token has expired") + except jwt.JWTError as e: + logger.error(f"JWT decode error: {e}") + raise HTTPException(status_code=401, detail="Could not validate credentials") + except Exception as e: + logger.error(f"Token verification error: {e}") + raise HTTPException(status_code=401, detail="Authentication failed") + + def get_current_user(self, db: Session, credentials: HTTPAuthorizationCredentials = Depends(security)) -> User: + """Get current authenticated user from database""" + user_data = self.verify_token(credentials.credentials) + + user = db.query(User).filter(User.id == user_data["user_id"]).first() + if not user: + raise HTTPException(status_code=401, detail="User not found") + + if not user.is_active: + raise HTTPException(status_code=401, detail="User account is inactive") + + return user + + def require_role(self, required_role: str): + """Decorator to require specific role""" + + def decorator(func): + def wrapper(current_user: User, *args, **kwargs): + if current_user.role != required_role: + raise HTTPException( + status_code=403, + detail=f"Required role '{required_role}' not found. Current role: '{current_user.role}'" + ) + return func(current_user, *args, **kwargs) + + return wrapper + + return decorator + + def require_admin(self, current_user: User): + """Require admin role""" + if current_user.role != "admin": + raise HTTPException( + status_code=403, + detail="Admin privileges required" + ) + return current_user + + def create_default_admin_user(self, db: Session): + """Create default admin user if it doesn't exist""" + admin_user = db.query(User).filter(User.username == "admin").first() + + if not admin_user: + hashed_password = self.hash_password("admin123") + admin_user = User( + email="admin@example.com", + username="admin", + hashed_password=hashed_password, + role="admin", + is_active=True + ) + db.add(admin_user) + db.commit() + db.refresh(admin_user) + logger.info("Default admin user created: username='admin', password='admin123'") + + return admin_user diff --git a/middleware/error_handler.py b/middleware/error_handler.py new file mode 100644 index 00000000..f4d3f0e4 --- /dev/null +++ b/middleware/error_handler.py @@ -0,0 +1,55 @@ +# middleware/error_handler.py +from fastapi import Request, HTTPException +from fastapi.responses import JSONResponse +from fastapi.exceptions import RequestValidationError +from starlette.exceptions import HTTPException as StarletteHTTPException +import logging + +logger = logging.getLogger(__name__) + +async def custom_http_exception_handler(request: Request, exc: HTTPException): + """Custom HTTP exception handler""" + logger.error(f"HTTP {exc.status_code}: {exc.detail} - {request.method} {request.url}") + + return JSONResponse( + status_code=exc.status_code, + content={ + "error": { + "code": exc.status_code, + "message": exc.detail, + "type": "http_exception" + } + } + ) + +async def validation_exception_handler(request: Request, exc: RequestValidationError): + """Handle Pydantic validation errors""" + logger.error(f"Validation error: {exc.errors()} - {request.method} {request.url}") + + return JSONResponse( + status_code=422, + content={ + "error": { + "code": 422, + "message": "Validation error", + "type": "validation_error", + "details": exc.errors() + } + } + ) + +async def general_exception_handler(request: Request, exc: Exception): + """Handle unexpected exceptions""" + logger.error(f"Unexpected error: {str(exc)} - {request.method} {request.url}", exc_info=True) + + return JSONResponse( + status_code=500, + content={ + "error": { + "code": 500, + "message": "Internal server error", + "type": "server_error" + } + } + ) + diff --git a/middleware/logging_middleware.py b/middleware/logging_middleware.py new file mode 100644 index 00000000..f0f71829 --- /dev/null +++ b/middleware/logging_middleware.py @@ -0,0 +1,46 @@ +# middleware/logging_middleware.py +import logging +import time +from fastapi import Request, Response +from starlette.middleware.base import BaseHTTPMiddleware +from typing import Callable + +logger = logging.getLogger(__name__) + + +class LoggingMiddleware(BaseHTTPMiddleware): + """Middleware for request/response logging and performance monitoring""" + + async def dispatch(self, request: Request, call_next: Callable) -> Response: + # Start timing + start_time = time.time() + + # Log request + client_ip = request.client.host if request.client else "unknown" + logger.info(f"Request: {request.method} {request.url.path} from {client_ip}") + + # Process request + try: + response = await call_next(request) + + # Calculate duration + duration = time.time() - start_time + + # Log response + logger.info( + f"Response: {response.status_code} for {request.method} {request.url.path} " + f"({duration:.3f}s)" + ) + + # Add performance headers + response.headers["X-Process-Time"] = str(duration) + + return response + + except Exception as e: + duration = time.time() - start_time + logger.error( + f"Error: {str(e)} for {request.method} {request.url.path} " + f"({duration:.3f}s)" + ) + raise \ No newline at end of file diff --git a/middleware/rate_limiter.py b/middleware/rate_limiter.py new file mode 100644 index 00000000..62c64c9a --- /dev/null +++ b/middleware/rate_limiter.py @@ -0,0 +1,82 @@ +# middleware/rate_limiter.py +from typing import Dict, Tuple +from datetime import datetime, timedelta +import logging +from collections import defaultdict, deque + +logger = logging.getLogger(__name__) + + +class RateLimiter: + """In-memory rate limiter using sliding window""" + + def __init__(self): + # Dictionary to store request timestamps for each client + self.clients: Dict[str, deque] = defaultdict(lambda: deque()) + self.cleanup_interval = 3600 # Clean up old entries every hour + self.last_cleanup = datetime.utcnow() + + def allow_request(self, client_id: str, max_requests: int, window_seconds: int) -> bool: + """ + Check if client is allowed to make a request + Uses sliding window algorithm + """ + now = datetime.utcnow() + window_start = now - timedelta(seconds=window_seconds) + + # Clean up old entries periodically + if (now - self.last_cleanup).seconds > self.cleanup_interval: + self._cleanup_old_entries() + self.last_cleanup = now + + # Get client's request history + client_requests = self.clients[client_id] + + # Remove requests outside the window + while client_requests and client_requests[0] < window_start: + client_requests.popleft() + + # Check if under rate limit + if len(client_requests) < max_requests: + client_requests.append(now) + return True + + logger.warning(f"Rate limit exceeded for client {client_id}: {len(client_requests)}/{max_requests}") + return False + + def _cleanup_old_entries(self): + """Clean up old entries to prevent memory leaks""" + cutoff_time = datetime.utcnow() - timedelta(hours=24) + + clients_to_remove = [] + for client_id, requests in self.clients.items(): + # Remove old requests + while requests and requests[0] < cutoff_time: + requests.popleft() + + # Mark empty clients for removal + if not requests: + clients_to_remove.append(client_id) + + # Remove empty clients + for client_id in clients_to_remove: + del self.clients[client_id] + + logger.info(f"Rate limiter cleanup completed. Removed {len(clients_to_remove)} inactive clients") + + def get_client_stats(self, client_id: str) -> Dict[str, int]: + """Get statistics for a specific client""" + client_requests = self.clients.get(client_id, deque()) + + now = datetime.utcnow() + hour_ago = now - timedelta(hours=1) + day_ago = now - timedelta(days=1) + + requests_last_hour = sum(1 for req_time in client_requests if req_time > hour_ago) + requests_last_day = sum(1 for req_time in client_requests if req_time > day_ago) + + return { + "requests_last_hour": requests_last_hour, + "requests_last_day": requests_last_day, + "total_tracked_requests": len(client_requests) + } \ No newline at end of file diff --git a/models/api_models.py b/models/api_models.py new file mode 100644 index 00000000..18da67e4 --- /dev/null +++ b/models/api_models.py @@ -0,0 +1,209 @@ +# models/api_models.py +from pydantic import BaseModel, Field, field_validator, EmailStr +from typing import Optional, List +from datetime import datetime + + +# User Authentication Models +class UserRegister(BaseModel): + email: EmailStr = Field(..., description="Valid email address") + username: str = Field(..., min_length=3, max_length=50, description="Username (3-50 characters)") + password: str = Field(..., min_length=6, description="Password (minimum 6 characters)") + + @field_validator('username') + @classmethod + def validate_username(cls, v): + if not v.isalnum(): + raise ValueError('Username must contain only alphanumeric characters') + return v.lower().strip() + + @field_validator('password') + @classmethod + def validate_password(cls, v): + if len(v) < 6: + raise ValueError('Password must be at least 6 characters long') + return v + + +class UserLogin(BaseModel): + username: str = Field(..., description="Username") + password: str = Field(..., description="Password") + + @field_validator('username') + @classmethod + def validate_username(cls, v): + return v.strip() + + +class UserResponse(BaseModel): + id: int + email: str + username: str + role: str + is_active: bool + last_login: Optional[datetime] = None + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class LoginResponse(BaseModel): + access_token: str + token_type: str = "bearer" + expires_in: int + user: UserResponse + + +# Base Product Models +class ProductBase(BaseModel): + product_id: Optional[str] = None + title: Optional[str] = None + description: Optional[str] = None + link: Optional[str] = None + image_link: Optional[str] = None + availability: Optional[str] = None + price: Optional[str] = None + brand: Optional[str] = None + gtin: Optional[str] = None + mpn: Optional[str] = None + condition: Optional[str] = None + adult: Optional[str] = None + multipack: Optional[int] = None + is_bundle: Optional[str] = None + age_group: Optional[str] = None + color: Optional[str] = None + gender: Optional[str] = None + material: Optional[str] = None + pattern: Optional[str] = None + size: Optional[str] = None + size_type: Optional[str] = None + size_system: Optional[str] = None + item_group_id: Optional[str] = None + google_product_category: Optional[str] = None + product_type: Optional[str] = None + custom_label_0: Optional[str] = None + custom_label_1: Optional[str] = None + custom_label_2: Optional[str] = None + custom_label_3: Optional[str] = None + custom_label_4: Optional[str] = None + additional_image_link: Optional[str] = None + sale_price: Optional[str] = None + unit_pricing_measure: Optional[str] = None + unit_pricing_base_measure: Optional[str] = None + identifier_exists: Optional[str] = None + shipping: Optional[str] = None + currency: Optional[str] = None + + +class ProductCreate(ProductBase): + product_id: str = Field(..., min_length=1, description="Product ID is required") + title: str = Field(..., min_length=1, description="Title is required") + + @field_validator('product_id', 'title') + @classmethod + def validate_required_fields(cls, v): + if not v or not v.strip(): + raise ValueError('Field cannot be empty') + return v.strip() + + +class ProductUpdate(ProductBase): + pass + + +class ProductResponse(ProductBase): + id: int + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +# Stock Models +class StockBase(BaseModel): + gtin: str = Field(..., min_length=1, description="GTIN is required") + location: str = Field(..., min_length=1, description="Location is required") + + +class StockCreate(StockBase): + quantity: int = Field(ge=0, description="Quantity must be non-negative") + + +class StockAdd(StockBase): + quantity: int = Field(gt=0, description="Quantity to add must be positive") + + +class StockUpdate(BaseModel): + quantity: int = Field(ge=0, description="Quantity must be non-negative") + + +class StockResponse(BaseModel): + id: int + gtin: str + location: str + quantity: int + created_at: datetime + updated_at: datetime + + model_config = {"from_attributes": True} + + +class StockLocationResponse(BaseModel): + location: str + quantity: int + + +class StockSummaryResponse(BaseModel): + gtin: str + total_quantity: int + locations: List[StockLocationResponse] + product_title: Optional[str] = None + + +# Import Models +class CSVImportRequest(BaseModel): + url: str = Field(..., description="URL to CSV file") + batch_size: Optional[int] = Field(1000, gt=0, le=10000, description="Batch size for processing") + + @field_validator('url') + @classmethod + def validate_url(cls, v): + if not v.startswith(('http://', 'https://')): + raise ValueError('URL must start with http:// or https://') + return v + + +class ImportJobResponse(BaseModel): + job_id: int + status: str + message: Optional[str] = None + imported: Optional[int] = 0 + updated: Optional[int] = 0 + total_processed: Optional[int] = 0 + error_count: Optional[int] = 0 + error_message: Optional[str] = None + created_at: Optional[datetime] = None + started_at: Optional[datetime] = None + completed_at: Optional[datetime] = None + + +# Response Models +class ProductListResponse(BaseModel): + products: List[ProductResponse] + total: int + skip: int + limit: int + + +class ProductDetailResponse(BaseModel): + product: ProductResponse + stock_info: Optional[StockSummaryResponse] = None + + +class StatsResponse(BaseModel): + total_products: int + unique_brands: int + unique_categories: int + total_stock_entries: int = 0 + total_inventory_quantity: int = 0 diff --git a/models/database_models.py b/models/database_models.py new file mode 100644 index 00000000..1801f52e --- /dev/null +++ b/models/database_models.py @@ -0,0 +1,120 @@ +# models/database_models.py +from sqlalchemy import Column, Integer, String, DateTime, ForeignKey, Index, UniqueConstraint, Boolean +from sqlalchemy.ext.declarative import declarative_base +from sqlalchemy.orm import relationship +from datetime import datetime + +Base = declarative_base() + + +class User(Base): + __tablename__ = "users" + + id = Column(Integer, primary_key=True, index=True) + email = Column(String, unique=True, index=True, nullable=False) + username = Column(String, unique=True, index=True, nullable=False) + hashed_password = Column(String, nullable=False) + role = Column(String, nullable=False, default="user") # 'admin' or 'user' + is_active = Column(Boolean, default=True, nullable=False) + last_login = Column(DateTime, nullable=True) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + def __repr__(self): + return f"" + + +class Product(Base): + __tablename__ = "products" + + id = Column(Integer, primary_key=True, index=True) + product_id = Column(String, unique=True, index=True, nullable=False) + title = Column(String, nullable=False) + description = Column(String) + link = Column(String) + image_link = Column(String) + availability = Column(String, index=True) # Index for filtering + price = Column(String) + brand = Column(String, index=True) # Index for filtering + gtin = Column(String, index=True) # Index for stock lookups + mpn = Column(String) + condition = Column(String) + adult = Column(String) + multipack = Column(Integer) + is_bundle = Column(String) + age_group = Column(String) + color = Column(String) + gender = Column(String) + material = Column(String) + pattern = Column(String) + size = Column(String) + size_type = Column(String) + size_system = Column(String) + item_group_id = Column(String) + google_product_category = Column(String, index=True) # Index for filtering + product_type = Column(String) + custom_label_0 = Column(String) + custom_label_1 = Column(String) + custom_label_2 = Column(String) + custom_label_3 = Column(String) + custom_label_4 = Column(String) + additional_image_link = Column(String) + sale_price = Column(String) + unit_pricing_measure = Column(String) + unit_pricing_base_measure = Column(String) + identifier_exists = Column(String) + shipping = Column(String) + currency = Column(String) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + # Relationship to stock (one-to-many via GTIN) + stock_entries = relationship("Stock", foreign_keys="Stock.gtin", primaryjoin="Product.gtin == Stock.gtin", + viewonly=True) + + def __repr__(self): + return f"" + + +class Stock(Base): + __tablename__ = "stock" + + id = Column(Integer, primary_key=True, index=True) + gtin = Column(String, index=True, nullable=False) # Foreign key relationship would be ideal + location = Column(String, nullable=False, index=True) + quantity = Column(Integer, nullable=False, default=0) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow, nullable=False) + + # Composite unique constraint to prevent duplicate GTIN-location combinations + __table_args__ = ( + UniqueConstraint('gtin', 'location', name='uq_stock_gtin_location'), + Index('idx_stock_gtin_location', 'gtin', 'location'), # Composite index for efficient queries + ) + + def __repr__(self): + return f"" + + +class ImportJob(Base): + __tablename__ = "import_jobs" + + id = Column(Integer, primary_key=True, index=True) + status = Column(String, nullable=False, + default="pending") # pending, processing, completed, failed, completed_with_errors + source_url = Column(String, nullable=False) + user_id = Column(Integer, ForeignKey('users.id')) # Foreign key to users table + imported_count = Column(Integer, default=0) + updated_count = Column(Integer, default=0) + error_count = Column(Integer, default=0) + total_processed = Column(Integer, default=0) + error_message = Column(String) + created_at = Column(DateTime, default=datetime.utcnow, nullable=False) + started_at = Column(DateTime) + completed_at = Column(DateTime) + + # Relationship to user + user = relationship("User", foreign_keys=[user_id]) + + def __repr__(self): + return f"" diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..93513fee --- /dev/null +++ b/requirements.txt @@ -0,0 +1,30 @@ +# requirements.txt +# Core FastAPI and web framework +fastapi==0.104.1 +uvicorn[standard]==0.24.0 +pydantic==2.5.0 +pydantic-settings==2.1.0 # Required for BaseSettings +pydantic[email]==2.5.0 + +# Database +sqlalchemy==2.0.23 +psycopg2-binary==2.9.7 # PostgreSQL adapter +alembic==1.12.1 # For database migrations + +# Authentication and Security +python-jose[cryptography]==3.3.0 # JWT handling +passlib[bcrypt]==1.7.4 # Password hashing with bcrypt +bcrypt==4.0.1 # Explicit bcrypt version for compatibility +python-multipart==0.0.6 # Form data parsing + +# Data processing +pandas==2.1.3 +requests==2.31.0 + +# Environment and configuration +python-dotenv==1.0.0 + +# Development and testing (optional) +pytest==7.4.3 +pytest-asyncio==0.21.1 +httpx==0.25.2 # For testing FastAPI endpoints \ No newline at end of file diff --git a/scripts/setup_dev.py b/scripts/setup_dev.py new file mode 100644 index 00000000..6b416df0 --- /dev/null +++ b/scripts/setup_dev.py @@ -0,0 +1,168 @@ +# scripts/setup_dev.py +# !/usr/bin/env python3 +"""Development environment setup script""" + +import os +import sys +import subprocess +from pathlib import Path + + +def run_command(command, description): + """Run a shell command and handle errors""" + print(f"Running: {description}") + try: + subprocess.run(command, shell=True, check=True) + print(f"✅ {description} completed successfully") + except subprocess.CalledProcessError as e: + print(f"❌ {description} failed: {e}") + return False + return True + + +def setup_alembic(): + """Set up Alembic for database migrations""" + alembic_dir = Path("alembic") + + # Check if alembic directory exists and has necessary files + if not alembic_dir.exists() or not (alembic_dir / "script.py.mako").exists(): + print("📝 Initializing Alembic...") + if alembic_dir.exists(): + # Remove incomplete alembic directory + import shutil + shutil.rmtree(alembic_dir) + + if not run_command("alembic init alembic", "Initializing Alembic"): + return False + + # Update alembic/env.py with proper configuration + env_py_content = '''from logging.config import fileConfig +from sqlalchemy import engine_from_config, pool +from alembic import context +import os +import sys + +# Add your project directory to the Python path +sys.path.append(os.path.dirname(os.path.dirname(__file__))) + +from models.database_models import Base +from config.settings import settings + +# Alembic Config object +config = context.config + +# Override sqlalchemy.url with our settings +config.set_main_option("sqlalchemy.url", settings.database_url) + +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +target_metadata = Base.metadata + +def run_migrations_offline() -> None: + """Run migrations in 'offline' mode.""" + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + ) + + with context.begin_transaction(): + context.run_migrations() + +def run_migrations_online() -> None: + """Run migrations in 'online' mode.""" + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata + ) + + with context.begin_transaction(): + context.run_migrations() + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() +''' + + env_py_path = alembic_dir / "env.py" + env_py_path.write_text(env_py_content) + print("✅ Updated alembic/env.py with project configuration") + + return True + + +def setup_environment(): + """Set up the development environment""" + print("🚀 Setting up ecommerce API development environment...") + + # Check Python version + if sys.version_info < (3, 8): + print("❌ Python 3.8+ is required") + return False + + # Create .env file if it doesn't exist + env_file = Path(".env") + env_example = Path(".env.example") + + if not env_file.exists() and env_example.exists(): + print("📝 Creating .env file from .env.example...") + env_file.write_text(env_example.read_text()) + elif not env_file.exists(): + print("📝 Creating default .env file...") + default_env = """DATABASE_URL=sqlite:///./ecommerce.db +JWT_SECRET_KEY=development-secret-key-change-in-production +JWT_EXPIRE_HOURS=24 +API_HOST=0.0.0.0 +API_PORT=8000 +DEBUG=True +RATE_LIMIT_ENABLED=True +DEFAULT_RATE_LIMIT=100 +DEFAULT_WINDOW_SECONDS=3600 +LOG_LEVEL=INFO +""" + env_file.write_text(default_env) + + # Install dependencies + if not run_command("pip install -r requirements.txt", "Installing dependencies"): + return False + + # Set up Alembic + if not setup_alembic(): + print("⚠️ Alembic setup failed. You'll need to set up database migrations manually.") + return False + + # Create initial migration + if not run_command("alembic revision --autogenerate -m \"Initial migration\"", "Creating initial migration"): + print("⚠️ Initial migration creation failed. Check your database models.") + + # Apply migrations + if not run_command("alembic upgrade head", "Setting up database"): + print("⚠️ Database setup failed. Make sure your DATABASE_URL is correct in .env") + + # Run tests + if not run_command("pytest", "Running tests"): + print("⚠️ Some tests failed. Check the output above.") + + print("\n🎉 Development environment setup complete!") + print("To start the development server, run:") + print(" uvicorn main:app --reload") + print("\nDatabase commands:") + print(" alembic revision --autogenerate -m \"Description\" # Create migration") + print(" alembic upgrade head # Apply migrations") + print(" alembic current # Check status") + + return True + + +if __name__ == "__main__": + setup_environment() diff --git a/tests/test_utils.py b/tests/test_utils.py new file mode 100644 index 00000000..6cd87525 --- /dev/null +++ b/tests/test_utils.py @@ -0,0 +1,61 @@ +# tests/test_utils.py +import pytest +from utils.data_processing import GTINProcessor, PriceProcessor + + +class TestGTINProcessor: + def setup_method(self): + self.processor = GTINProcessor() + + def test_normalize_valid_gtin(self): + # Test EAN-13 + assert self.processor.normalize("1234567890123") == "1234567890123" + + # Test UPC-A + assert self.processor.normalize("123456789012") == "123456789012" + + # Test with decimal + assert self.processor.normalize("123456789012.0") == "123456789012" + + def test_normalize_invalid_gtin(self): + assert self.processor.normalize("") is None + assert self.processor.normalize(None) is None + assert self.processor.normalize("abc") is None + assert self.processor.normalize("123") == "000000000123" # Padded to 12 digits + + def test_validate_gtin(self): + assert self.processor.validate("1234567890123") is True + assert self.processor.validate("123456789012") is True + assert self.processor.validate("12345678") is True + assert self.processor.validate("123") is False + assert self.processor.validate("") is False + + +class TestPriceProcessor: + def setup_method(self): + self.processor = PriceProcessor() + + def test_parse_price_currency(self): + # Test EUR with symbol + price, currency = self.processor.parse_price_currency("8.26 EUR") + assert price == "8.26" + assert currency == "EUR" + + # Test USD with symbol + price, currency = self.processor.parse_price_currency("$12.50") + assert price == "12.50" + assert currency == "USD" + + # Test with comma decimal separator + price, currency = self.processor.parse_price_currency("8,26 €") + assert price == "8.26" + assert currency == "EUR" + + def test_parse_invalid_price(self): + price, currency = self.processor.parse_price_currency("") + assert price is None + assert currency is None + + price, currency = self.processor.parse_price_currency(None) + assert price is None + assert currency is None \ No newline at end of file diff --git a/updated_readme.md b/updated_readme.md new file mode 100644 index 00000000..f945aada --- /dev/null +++ b/updated_readme.md @@ -0,0 +1,569 @@ +# Ecommerce Backend API v2.1 + +A robust, production-ready FastAPI backend for ecommerce product catalog and inventory management with JWT authentication and advanced CSV import capabilities. + +## Key Features + +### Security & Authentication +- **JWT Authentication**: Secure token-based authentication with configurable expiration (30 minutes default) +- **User Management**: Registration, login, role-based access control (Admin/User roles) +- **Password Security**: Bcrypt hashing for secure password storage +- **Protected Endpoints**: All product management operations require authentication +- **Default Admin Account**: Auto-created admin user for immediate system access + +### Architecture Improvements +- **Modular Design**: Separated concerns into utility modules, middleware, and models +- **Database Optimization**: Added proper indexing strategy and foreign key relationships +- **Connection Pooling**: PostgreSQL support with connection pooling for production scalability +- **Background Processing**: Asynchronous CSV import with job tracking + +### Performance Optimizations +- **Batch Processing**: CSV imports processed in configurable batches +- **Database Indexes**: Strategic indexing for common query patterns +- **Streaming Export**: Memory-efficient CSV export for large datasets +- **Rate Limiting**: Sliding window rate limiter to prevent API abuse + +### Data Processing +- **Robust GTIN Handling**: Centralized GTIN normalization and validation +- **Multi-currency Support**: Advanced price parsing with currency extraction +- **International Content**: Multi-encoding CSV support for global data + +## Project Structure + +``` +ecommerce_api/ +├── main.py # FastAPI application entry point with auth +├── models/ +│ ├── database_models.py # SQLAlchemy ORM models (User, Product, Stock, ImportJob) +│ └── api_models.py # Pydantic API models with auth models +├── utils/ +│ ├── data_processing.py # GTIN and price processing utilities +│ ├── csv_processor.py # CSV import/export handling +│ └── database.py # Database configuration +├── middleware/ +│ ├── auth.py # JWT authentication with bcrypt +│ ├── rate_limiter.py # Rate limiting implementation +│ ├── error_handler.py # Centralized error handling +│ └── logging_middleware.py # Request/response logging +├── tests/ +│ └── test_auth.py # Authentication tests +├── requirements.txt # Python dependencies with auth packages +└── README.md # This file +``` + +## Quick Start + +### 1. Installation + +```bash +# Clone the repository +git clone +cd ecommerce-api + +# Set up virtual environment +python -m venv venv +source venv/bin/activate # On Windows: venv\Scripts\activate + +# Install dependencies +pip install -r requirements.txt +``` + +### 2. Environment Configuration + +Create a `.env` file in the project root: + +```env +# Database +DATABASE_URL=postgresql://user:password@localhost:5432/ecommerce_db +# For SQLite (development): DATABASE_URL=sqlite:///./ecommerce.db + +# JWT Configuration +JWT_SECRET_KEY=your-super-secret-key-change-in-production-immediately +JWT_EXPIRE_MINUTES=30 + +# Server Configuration +API_HOST=0.0.0.0 +API_PORT=8000 +DEBUG=False +``` + +**Important Security Note**: Always change the `JWT_SECRET_KEY` in production! + +### 3. Database Setup + +**For SQLite (Development):** +```bash +# Run the application - it will create tables automatically +python main.py +``` + +**For PostgreSQL (Production):** +```bash +# Create PostgreSQL database +createdb ecommerce_db + +# Run the application - it will create tables and indexes automatically +python main.py +``` + +### 4. Start the Server + +```bash +# Development server +uvicorn main:app --reload --host 0.0.0.0 --port 8000 + +# Production server +uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4 +``` + +The API will be available at `http://localhost:8000` + +### 5. Default Admin Access + +The system automatically creates a default admin user: +- **Username**: `admin` +- **Password**: `admin123` +- **Email**: `admin@example.com` + +**Security Warning**: Change the admin password immediately in production! + +## Authentication Flow + +### 1. Register a New User + +```bash +curl -X POST "http://localhost:8000/register" \ + -H "Content-Type: application/json" \ + -d '{ + "email": "user@example.com", + "username": "newuser", + "password": "securepassword123" + }' +``` + +### 2. Login and Get JWT Token + +```bash +curl -X POST "http://localhost:8000/login" \ + -H "Content-Type: application/json" \ + -d '{ + "username": "admin", + "password": "admin123" + }' +``` + +Response: +```json +{ + "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...", + "token_type": "bearer", + "expires_in": 1800, + "user": { + "id": 1, + "username": "admin", + "email": "admin@example.com", + "role": "admin", + "is_active": true + } +} +``` + +### 3. Use Token for Protected Endpoints + +```bash +curl -X GET "http://localhost:8000/products" \ + -H "Authorization: Bearer YOUR_JWT_TOKEN_HERE" +``` + +## API Endpoints + +### Public Endpoints +- `GET /` - API information +- `GET /health` - Health check +- `POST /register` - Register new user +- `POST /login` - Login and get JWT token + +### Protected Endpoints (Require Authentication) + +#### User Management +- `GET /me` - Get current user information + +#### Products (All users) +- `GET /products` - List products with filtering and search +- `POST /products` - Create new product +- `GET /products/{product_id}` - Get product with stock info +- `PUT /products/{product_id}` - Update product +- `DELETE /products/{product_id}` - Delete product and associated stock + +#### Stock Management (All users) +- `POST /stock` - Set exact stock quantity +- `GET /stock/{gtin}` - Get stock summary by GTIN + +#### CSV Operations (All users) +- `POST /import-csv` - Start background CSV import +- `GET /import-status/{job_id}` - Check import job status (own jobs only) +- `GET /export-csv` - Export products as CSV (streaming) + +#### Statistics (All users) +- `GET /stats` - System statistics + +#### Admin-Only Endpoints +- `GET /admin/users` - List all users +- `PUT /admin/users/{user_id}/status` - Activate/deactivate users + +## User Roles and Permissions + +### Regular Users +- Can register and login +- Access all product and stock management features +- Can import/export CSV files +- Can only view their own import jobs +- Cannot manage other users + +### Admin Users +- All regular user permissions +- Can view and manage all users +- Can view all import jobs from any user +- Can activate/deactivate user accounts + +## Security Features + +### Password Security +- Passwords hashed using bcrypt with salt +- Minimum password length: 6 characters +- No password storage in plaintext anywhere + +### JWT Token Security +- Tokens include expiration timestamp +- Tokens include user role and permissions +- Configurable expiration time (default: 30 minutes) +- Secure token validation on each request + +### Rate Limiting +- CSV imports: 10 requests per hour +- General API: 100 requests per hour per client +- Customizable per endpoint + +### Input Validation +- All inputs validated using Pydantic models +- Email format validation for registration +- Username alphanumeric validation +- GTIN format validation and normalization + +## Advanced Features + +### Background CSV Import + +Import large CSV files asynchronously: + +```python +import requests + +# Start import +response = requests.post( + 'http://localhost:8000/import-csv', + headers={'Authorization': 'Bearer YOUR_TOKEN'}, + json={ + 'url': 'https://example.com/products.csv', + 'batch_size': 1000 + } +) + +job_id = response.json()['job_id'] + +# Check status +status_response = requests.get( + f'http://localhost:8000/import-status/{job_id}', + headers={'Authorization': 'Bearer YOUR_TOKEN'} +) +print(status_response.json()) +``` + +### Product Search and Filtering + +```bash +# Search in title and description +GET /products?search=laptop + +# Filter by brand and category +GET /products?brand=Apple&category=Electronics + +# Combine filters with pagination +GET /products?brand=Samsung&availability=in%20stock&search=phone&skip=0&limit=50 +``` + +### Stock Management + +```bash +# Set stock for a specific location +curl -X POST "http://localhost:8000/stock" \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -H "Content-Type: application/json" \ + -d '{ + "gtin": "1234567890123", + "location": "WAREHOUSE_A", + "quantity": 100 + }' + +# Get stock summary +curl -X GET "http://localhost:8000/stock/1234567890123" \ + -H "Authorization: Bearer YOUR_TOKEN" +``` + +## Database Schema + +### Users Table +```sql +CREATE TABLE users ( + id SERIAL PRIMARY KEY, + email VARCHAR UNIQUE NOT NULL, + username VARCHAR UNIQUE NOT NULL, + hashed_password VARCHAR NOT NULL, + role VARCHAR DEFAULT 'user', + is_active BOOLEAN DEFAULT true, + last_login TIMESTAMP, + created_at TIMESTAMP DEFAULT NOW(), + updated_at TIMESTAMP DEFAULT NOW() +); +``` + +### Products Table +- Full product catalog with Google Shopping compatibility +- Indexed fields: `gtin`, `brand`, `google_product_category`, `availability` +- Supports all Google Shopping feed attributes + +### Stock Table +- Location-based inventory tracking +- GTIN-based product linking +- Unique constraint on GTIN+location combinations + +### Import Jobs Table +- Track background import operations +- User ownership and access control +- Status monitoring and error handling + +## Development + +### Running Tests + +```bash +# Install test dependencies +pip install pytest pytest-asyncio httpx + +# Run all tests +pytest + +# Run with coverage +pytest --cov=. tests/ +``` + +### Development Server with Auto-reload + +```bash +uvicorn main:app --reload --host 0.0.0.0 --port 8000 +``` + +## Production Deployment + +### Security Checklist + +- [ ] Change default admin password immediately +- [ ] Set strong JWT_SECRET_KEY (32+ random characters) +- [ ] Configure JWT_EXPIRE_MINUTES appropriately +- [ ] Set up HTTPS/TLS termination +- [ ] Configure CORS for your frontend domains only +- [ ] Set up database connection limits and pooling +- [ ] Enable request logging and monitoring +- [ ] Configure rate limiting per your needs +- [ ] Set up user account monitoring and alerting +- [ ] Regular security audits of user accounts + +### Environment Variables for Production + +```env +# Security +JWT_SECRET_KEY=your-very-long-random-secret-key-at-least-32-characters +JWT_EXPIRE_MINUTES=30 + +# Database (use PostgreSQL in production) +DATABASE_URL=postgresql://user:password@db-host:5432/ecommerce_prod + +# Server +DEBUG=False +API_HOST=0.0.0.0 +API_PORT=8000 + +# Optional: External services +REDIS_URL=redis://redis-host:6379/0 +``` + +### Docker Deployment + +```yaml +# docker-compose.yml +version: '3.8' +services: + db: + image: postgres:15 + environment: + POSTGRES_DB: ecommerce + POSTGRES_USER: ecommerce_user + POSTGRES_PASSWORD: secure_password + volumes: + - postgres_data:/var/lib/postgresql/data + ports: + - "5432:5432" + + api: + build: . + environment: + DATABASE_URL: postgresql://ecommerce_user:secure_password@db:5432/ecommerce + JWT_SECRET_KEY: your-production-secret-key + JWT_EXPIRE_MINUTES: 30 + ports: + - "8000:8000" + depends_on: + - db + restart: unless-stopped + +volumes: + postgres_data: +``` + +### Nginx Configuration + +```nginx +server { + listen 80; + server_name your-api-domain.com; + + location / { + proxy_pass http://localhost:8000; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } +} +``` + +## Troubleshooting + +### Authentication Issues + +**Invalid Token Errors:** +- Check token expiration (default 30 minutes) +- Verify JWT_SECRET_KEY is consistent +- Ensure Bearer token format: `Authorization: Bearer ` + +**Login Failures:** +- Verify username/email and password +- Check if user account is active (`is_active: true`) +- Review user registration process + +**Permission Denied:** +- Confirm user role permissions +- Check endpoint access requirements +- Verify token contains correct role information + +### Database Issues + +**Connection Errors:** +- Verify DATABASE_URL format and credentials +- Check database server accessibility +- Monitor connection pool limits + +**Migration Issues:** +- Tables are created automatically on startup +- For schema changes, implement proper migrations +- Backup data before major updates + +### Common API Issues + +**CSV Import Failures:** +- Check file URL accessibility +- Verify CSV format and encoding +- Monitor import job status for detailed errors +- Ensure proper authentication token + +**Rate Limiting:** +- Default limits: 100 requests/hour, 10 CSV imports/hour +- Check rate limit headers in responses +- Implement proper retry logic with backoff + +### Logging and Monitoring + +Application logs include: +- Authentication events (login, failed attempts) +- API request/response times +- Import job progress and errors +- Rate limiting events +- Database query performance + +```bash +# View logs in development +python main.py # Logs to console + +# Docker logs +docker-compose logs -f api +``` + +## Migration from v2.0 + +If upgrading from v2.0 to v2.1: + +1. **Install new dependencies:** + ```bash + pip install -r requirements.txt + ``` + +2. **Update environment variables:** + ```bash + echo "JWT_SECRET_KEY=your-secret-key" >> .env + echo "JWT_EXPIRE_MINUTES=30" >> .env + ``` + +3. **Database migration:** + The application will automatically create the new `users` table and update the `import_jobs` table on startup. + +4. **Update client code:** + - Add authentication to all product management API calls + - Implement login flow in your frontend + - Handle JWT token refresh + +## Contributing + +1. Fork the repository +2. Create a feature branch: `git checkout -b feature-name` +3. Make changes with proper tests +4. Run security and quality checks +5. Update documentation if needed +6. Submit a pull request + +### Code Quality Standards + +- All endpoints must have proper authentication +- Password handling must use bcrypt +- JWT tokens must have expiration +- Input validation using Pydantic models +- Comprehensive error handling +- Unit tests for authentication logic + +## License + +This project is licensed under the MIT License - see the LICENSE file for details. + +## Security + +For security issues, please email the maintainers directly instead of creating a public issue. + +## Support + +For issues and questions: +1. Check the troubleshooting section above +2. Review existing GitHub issues +3. Create a new issue with detailed information including: + - Authentication steps you've tried + - Error messages and logs + - API endpoint and request details + - Environment configuration (without secrets) \ No newline at end of file diff --git a/utils/csv_processor.py b/utils/csv_processor.py new file mode 100644 index 00000000..ca4cbf41 --- /dev/null +++ b/utils/csv_processor.py @@ -0,0 +1,253 @@ +# utils/csv_processor.py +import pandas as pd +import requests +from io import StringIO +from typing import Dict, Any, Optional +from sqlalchemy.orm import Session +from models.database_models import Product +from datetime import datetime +import logging + +logger = logging.getLogger(__name__) + + +class CSVProcessor: + """Handles CSV import with robust parsing and batching""" + + ENCODINGS = ['utf-8', 'latin-1', 'iso-8859-1', 'cp1252', 'utf-8-sig'] + + COLUMN_MAPPING = { + # Standard variations + 'id': 'product_id', + 'ID': 'product_id', + 'Product ID': 'product_id', + 'name': 'title', + 'Name': 'title', + 'product_name': 'title', + 'Product Name': 'title', + + # Google Shopping feed standard + 'g:id': 'product_id', + 'g:title': 'title', + 'g:description': 'description', + 'g:link': 'link', + 'g:image_link': 'image_link', + 'g:availability': 'availability', + 'g:price': 'price', + 'g:brand': 'brand', + 'g:gtin': 'gtin', + 'g:mpn': 'mpn', + 'g:condition': 'condition', + 'g:adult': 'adult', + 'g:multipack': 'multipack', + 'g:is_bundle': 'is_bundle', + 'g:age_group': 'age_group', + 'g:color': 'color', + 'g:gender': 'gender', + 'g:material': 'material', + 'g:pattern': 'pattern', + 'g:size': 'size', + 'g:size_type': 'size_type', + 'g:size_system': 'size_system', + 'g:item_group_id': 'item_group_id', + 'g:google_product_category': 'google_product_category', + 'g:product_type': 'product_type', + 'g:custom_label_0': 'custom_label_0', + 'g:custom_label_1': 'custom_label_1', + 'g:custom_label_2': 'custom_label_2', + 'g:custom_label_3': 'custom_label_3', + 'g:custom_label_4': 'custom_label_4', + + # Handle complex shipping column + 'shipping(country:price:max_handling_time:min_transit_time:max_transit_time)': 'shipping' + } + + def __init__(self): + from utils.data_processing import GTINProcessor, PriceProcessor + self.gtin_processor = GTINProcessor() + self.price_processor = PriceProcessor() + + def download_csv(self, url: str) -> str: + """Download and decode CSV with multiple encoding attempts""" + try: + response = requests.get(url, timeout=30) + response.raise_for_status() + + content = response.content + + # Try different encodings + for encoding in self.ENCODINGS: + try: + decoded_content = content.decode(encoding) + logger.info(f"Successfully decoded CSV with encoding: {encoding}") + return decoded_content + except UnicodeDecodeError: + continue + + # Fallback with error ignoring + decoded_content = content.decode('utf-8', errors='ignore') + logger.warning("Used UTF-8 with error ignoring for CSV decoding") + return decoded_content + + except requests.RequestException as e: + logger.error(f"Error downloading CSV: {e}") + raise + + def parse_csv(self, csv_content: str) -> pd.DataFrame: + """Parse CSV with multiple separator attempts""" + parsing_configs = [ + # Try auto-detection first + {'sep': None, 'engine': 'python'}, + # Try semicolon (common in European CSVs) + {'sep': ';', 'engine': 'python'}, + # Try comma + {'sep': ',', 'engine': 'python'}, + # Try tab + {'sep': '\t', 'engine': 'python'}, + ] + + for config in parsing_configs: + try: + df = pd.read_csv( + StringIO(csv_content), + on_bad_lines='skip', + quotechar='"', + skip_blank_lines=True, + skipinitialspace=True, + **config + ) + logger.info(f"Successfully parsed CSV with config: {config}") + return df + except pd.errors.ParserError: + continue + + raise pd.errors.ParserError("Could not parse CSV with any configuration") + + def normalize_columns(self, df: pd.DataFrame) -> pd.DataFrame: + """Normalize column names using mapping""" + # Clean column names + df.columns = df.columns.str.strip() + + # Apply mapping + df = df.rename(columns=self.COLUMN_MAPPING) + + logger.info(f"Normalized columns: {list(df.columns)}") + return df + + def process_row(self, row_data: Dict[str, Any]) -> Dict[str, Any]: + """Process a single row with data normalization""" + # Handle NaN values + processed_data = {k: (v if pd.notna(v) else None) for k, v in row_data.items()} + + # Process GTIN + if processed_data.get('gtin'): + processed_data['gtin'] = self.gtin_processor.normalize(processed_data['gtin']) + + # Process price and currency + if processed_data.get('price'): + parsed_price, currency = self.price_processor.parse_price_currency(processed_data['price']) + processed_data['price'] = parsed_price + processed_data['currency'] = currency + + # Process sale_price + if processed_data.get('sale_price'): + parsed_sale_price, _ = self.price_processor.parse_price_currency(processed_data['sale_price']) + processed_data['sale_price'] = parsed_sale_price + + # Clean MPN (remove .0 endings) + if processed_data.get('mpn'): + mpn_str = str(processed_data['mpn']).strip() + if mpn_str.endswith('.0'): + processed_data['mpn'] = mpn_str[:-2] + + # Handle multipack type conversion + if processed_data.get('multipack') is not None: + try: + processed_data['multipack'] = int(float(processed_data['multipack'])) + except (ValueError, TypeError): + processed_data['multipack'] = None + + return processed_data + + async def process_csv_from_url(self, url: str, batch_size: int, db: Session) -> Dict[str, int]: + """Process CSV import with batching""" + # Download and parse CSV + csv_content = self.download_csv(url) + df = self.parse_csv(csv_content) + df = self.normalize_columns(df) + + logger.info(f"Processing CSV with {len(df)} rows") + + imported = 0 + updated = 0 + errors = 0 + + # Process in batches + for i in range(0, len(df), batch_size): + batch_df = df.iloc[i:i + batch_size] + batch_imported, batch_updated, batch_errors = self._process_batch(batch_df, db) + + imported += batch_imported + updated += batch_updated + errors += batch_errors + + # Commit batch + try: + db.commit() + logger.info( + f"Processed batch {i // batch_size + 1}: +{batch_imported} imported, +{batch_updated} updated, +{batch_errors} errors") + except Exception as e: + db.rollback() + logger.error(f"Batch commit failed: {e}") + errors += len(batch_df) + + return { + "imported": imported, + "updated": updated, + "errors": errors, + "total_processed": imported + updated + errors + } + + def _process_batch(self, df_batch: pd.DataFrame, db: Session) -> tuple: + """Process a single batch of rows""" + imported = 0 + updated = 0 + errors = 0 + + for _, row in df_batch.iterrows(): + try: + product_data = self.process_row(row.to_dict()) + + # Validate required fields + product_id = product_data.get('product_id') + title = product_data.get('title') + + if not product_id or not title: + errors += 1 + continue + + # Check for existing product + existing_product = db.query(Product).filter( + Product.product_id == product_id + ).first() + + if existing_product: + # Update existing + for key, value in product_data.items(): + if key not in ['id', 'created_at'] and hasattr(existing_product, key): + setattr(existing_product, key, value) + existing_product.updated_at = datetime.utcnow() + updated += 1 + else: + # Create new + filtered_data = {k: v for k, v in product_data.items() + if k not in ['id', 'created_at', 'updated_at'] and hasattr(Product, k)} + new_product = Product(**filtered_data) + db.add(new_product) + imported += 1 + + except Exception as e: + logger.error(f"Error processing row: {e}") + errors += 1 + + return imported, updated, errors diff --git a/utils/data_processing.py b/utils/data_processing.py new file mode 100644 index 00000000..a6c35b5c --- /dev/null +++ b/utils/data_processing.py @@ -0,0 +1,129 @@ +# utils/data_processing.py +import re +import pandas as pd +from typing import Tuple, Optional +import logging + +logger = logging.getLogger(__name__) + + +class GTINProcessor: + """Handles GTIN normalization and validation""" + + VALID_LENGTHS = [8, 12, 13, 14] + + def normalize(self, gtin_value: any) -> Optional[str]: + """ + Normalize GTIN to proper format + Returns None for invalid GTINs + """ + if not gtin_value or pd.isna(gtin_value): + return None + + gtin_str = str(gtin_value).strip() + if not gtin_str: + return None + + # Remove decimal point (e.g., "889698116923.0" -> "889698116923") + if '.' in gtin_str: + gtin_str = gtin_str.split('.')[0] + + # Keep only digits + gtin_clean = ''.join(filter(str.isdigit, gtin_str)) + + if not gtin_clean: + return None + + # Validate and normalize length + length = len(gtin_clean) + + if length in self.VALID_LENGTHS: + # Standard lengths - pad appropriately + if length == 8: + return gtin_clean.zfill(8) # EAN-8 + elif length == 12: + return gtin_clean.zfill(12) # UPC-A + elif length == 13: + return gtin_clean.zfill(13) # EAN-13 + elif length == 14: + return gtin_clean.zfill(14) # GTIN-14 + + elif length > 14: + # Too long - truncate to EAN-13 + logger.warning(f"GTIN too long, truncating: {gtin_clean}") + return gtin_clean[-13:] + + elif 0 < length < 8: + # Too short - pad to UPC-A + logger.warning(f"GTIN too short, padding: {gtin_clean}") + return gtin_clean.zfill(12) + + logger.warning(f"Invalid GTIN format: '{gtin_value}'") + return None + + def validate(self, gtin: str) -> bool: + """Validate GTIN format""" + if not gtin: + return False + return len(gtin) in self.VALID_LENGTHS and gtin.isdigit() + + +class PriceProcessor: + """Handles price parsing and currency extraction""" + + CURRENCY_PATTERNS = { + # Amount followed by currency + r'([0-9.,]+)\s*(EUR|€)': lambda m: (m.group(1), 'EUR'), + r'([0-9.,]+)\s*(USD|\$)': lambda m: (m.group(1), 'USD'), + r'([0-9.,]+)\s*(GBP|£)': lambda m: (m.group(1), 'GBP'), + r'([0-9.,]+)\s*(CHF)': lambda m: (m.group(1), 'CHF'), + r'([0-9.,]+)\s*(CAD|AUD|JPY|¥)': lambda m: (m.group(1), m.group(2).upper()), + + # Currency followed by amount + r'(EUR|€)\s*([0-9.,]+)': lambda m: (m.group(2), 'EUR'), + r'(USD|\$)\s*([0-9.,]+)': lambda m: (m.group(2), 'USD'), + r'(GBP|£)\s*([0-9.,]+)': lambda m: (m.group(2), 'GBP'), + + # Generic 3-letter currency codes + r'([0-9.,]+)\s*([A-Z]{3})': lambda m: (m.group(1), m.group(2)), + r'([A-Z]{3})\s*([0-9.,]+)': lambda m: (m.group(2), m.group(1)), + } + + def parse_price_currency(self, price_str: any) -> Tuple[Optional[str], Optional[str]]: + """ + Parse price string into (price, currency) tuple + Returns (None, None) if parsing fails + """ + if not price_str or pd.isna(price_str): + return None, None + + price_str = str(price_str).strip() + if not price_str: + return None, None + + # Try each pattern + for pattern, extract_func in self.CURRENCY_PATTERNS.items(): + match = re.search(pattern, price_str, re.IGNORECASE) + if match: + try: + price_val, currency_val = extract_func(match) + # Normalize price (remove spaces, handle comma as decimal) + price_val = price_val.replace(' ', '').replace(',', '.') + # Validate numeric + float(price_val) + return price_val, currency_val.upper() + except (ValueError, AttributeError): + continue + + # Fallback: extract just numbers + number_match = re.search(r'([0-9.,]+)', price_str) + if number_match: + try: + price_val = number_match.group(1).replace(',', '.') + float(price_val) # Validate + return price_val, None + except ValueError: + pass + + logger.warning(f"Could not parse price: '{price_str}'") + return price_str, None diff --git a/utils/database.py b/utils/database.py new file mode 100644 index 00000000..b1523090 --- /dev/null +++ b/utils/database.py @@ -0,0 +1,36 @@ +# utils/database.py +from sqlalchemy import create_engine +from sqlalchemy.orm import sessionmaker +from sqlalchemy.pool import QueuePool +import logging + +logger = logging.getLogger(__name__) + + +def get_db_engine(database_url: str): + """Create database engine with connection pooling""" + if database_url.startswith('sqlite'): + # SQLite configuration + engine = create_engine( + database_url, + connect_args={"check_same_thread": False}, + echo=False + ) + else: + # PostgreSQL configuration with connection pooling + engine = create_engine( + database_url, + poolclass=QueuePool, + pool_size=10, + max_overflow=20, + pool_pre_ping=True, + echo=False + ) + + logger.info(f"Database engine created for: {database_url.split('@')[0]}@...") + return engine + + +def get_session_local(engine): + """Create session factory""" + return sessionmaker(autocommit=False, autoflush=False, bind=engine)