diff --git a/alembic/versions/1b398cf45e85_add_letzshop_vendor_cache_table.py b/alembic/versions/1b398cf45e85_add_letzshop_vendor_cache_table.py new file mode 100644 index 00000000..b113d400 --- /dev/null +++ b/alembic/versions/1b398cf45e85_add_letzshop_vendor_cache_table.py @@ -0,0 +1,367 @@ +"""add letzshop_vendor_cache table + +Revision ID: 1b398cf45e85 +Revises: 09d84a46530f +Create Date: 2026-01-13 19:38:45.423378 + +""" +from typing import Sequence, Union + +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql +from sqlalchemy.dialects import sqlite + +# revision identifiers, used by Alembic. +revision: str = '1b398cf45e85' +down_revision: Union[str, None] = '09d84a46530f' +branch_labels: Union[str, Sequence[str], None] = None +depends_on: Union[str, Sequence[str], None] = None + + +def upgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('letzshop_vendor_cache', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('letzshop_id', sa.String(length=50), nullable=False), + sa.Column('slug', sa.String(length=200), nullable=False), + sa.Column('name', sa.String(length=255), nullable=False), + sa.Column('company_name', sa.String(length=255), nullable=True), + sa.Column('is_active', sa.Boolean(), nullable=True), + sa.Column('description_en', sa.Text(), nullable=True), + sa.Column('description_fr', sa.Text(), nullable=True), + sa.Column('description_de', sa.Text(), nullable=True), + sa.Column('email', sa.String(length=255), nullable=True), + sa.Column('phone', sa.String(length=50), nullable=True), + sa.Column('fax', sa.String(length=50), nullable=True), + sa.Column('website', sa.String(length=500), nullable=True), + sa.Column('street', sa.String(length=255), nullable=True), + sa.Column('street_number', sa.String(length=50), nullable=True), + sa.Column('city', sa.String(length=100), nullable=True), + sa.Column('zipcode', sa.String(length=20), nullable=True), + sa.Column('country_iso', sa.String(length=5), nullable=True), + sa.Column('latitude', sa.String(length=20), nullable=True), + sa.Column('longitude', sa.String(length=20), nullable=True), + sa.Column('categories', sqlite.JSON(), nullable=True), + sa.Column('background_image_url', sa.String(length=500), nullable=True), + sa.Column('social_media_links', sqlite.JSON(), nullable=True), + sa.Column('opening_hours_en', sa.Text(), nullable=True), + sa.Column('opening_hours_fr', sa.Text(), nullable=True), + sa.Column('opening_hours_de', sa.Text(), nullable=True), + sa.Column('representative_name', sa.String(length=255), nullable=True), + sa.Column('representative_title', sa.String(length=100), nullable=True), + sa.Column('claimed_by_vendor_id', sa.Integer(), nullable=True), + sa.Column('claimed_at', sa.DateTime(timezone=True), nullable=True), + sa.Column('last_synced_at', sa.DateTime(timezone=True), nullable=False), + sa.Column('raw_data', sqlite.JSON(), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=False), + sa.Column('updated_at', sa.DateTime(), nullable=False), + sa.ForeignKeyConstraint(['claimed_by_vendor_id'], ['vendors.id'], ), + sa.PrimaryKeyConstraint('id') + ) + op.create_index('idx_vendor_cache_active', 'letzshop_vendor_cache', ['is_active'], unique=False) + op.create_index('idx_vendor_cache_city', 'letzshop_vendor_cache', ['city'], unique=False) + op.create_index('idx_vendor_cache_claimed', 'letzshop_vendor_cache', ['claimed_by_vendor_id'], unique=False) + op.create_index(op.f('ix_letzshop_vendor_cache_claimed_by_vendor_id'), 'letzshop_vendor_cache', ['claimed_by_vendor_id'], unique=False) + op.create_index(op.f('ix_letzshop_vendor_cache_id'), 'letzshop_vendor_cache', ['id'], unique=False) + op.create_index(op.f('ix_letzshop_vendor_cache_letzshop_id'), 'letzshop_vendor_cache', ['letzshop_id'], unique=True) + op.create_index(op.f('ix_letzshop_vendor_cache_slug'), 'letzshop_vendor_cache', ['slug'], unique=True) + op.drop_constraint('architecture_rules_rule_id_key', 'architecture_rules', type_='unique') + op.alter_column('capacity_snapshots', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('now()')) + op.alter_column('capacity_snapshots', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('now()')) + op.create_index(op.f('ix_features_id'), 'features', ['id'], unique=False) + op.create_index(op.f('ix_features_minimum_tier_id'), 'features', ['minimum_tier_id'], unique=False) + op.create_index('idx_inv_tx_order', 'inventory_transactions', ['order_id'], unique=False) + op.alter_column('invoices', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('invoices', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_fulfillment_queue', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_fulfillment_queue', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_sync_logs', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_sync_logs', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('media_files', 'created_at', + existing_type=postgresql.TIMESTAMP(), + nullable=False, + existing_server_default=sa.text('now()')) + op.alter_column('media_files', 'updated_at', + existing_type=postgresql.TIMESTAMP(), + nullable=False) + op.alter_column('order_item_exceptions', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_item_exceptions', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_items', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_items', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('orders', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('orders', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_index('ix_password_reset_tokens_customer_id', table_name='password_reset_tokens') + op.create_index(op.f('ix_password_reset_tokens_id'), 'password_reset_tokens', ['id'], unique=False) + op.alter_column('product_media', 'created_at', + existing_type=postgresql.TIMESTAMP(), + nullable=False, + existing_server_default=sa.text('now()')) + op.alter_column('product_media', 'updated_at', + existing_type=postgresql.TIMESTAMP(), + nullable=False) + op.alter_column('products', 'is_digital', + existing_type=sa.BOOLEAN(), + nullable=True, + existing_server_default=sa.text('false')) + op.alter_column('products', 'product_type', + existing_type=sa.VARCHAR(length=20), + nullable=True, + existing_server_default=sa.text("'physical'::character varying")) + op.drop_index('idx_product_is_digital', table_name='products') + op.create_index(op.f('ix_products_is_digital'), 'products', ['is_digital'], unique=False) + op.drop_constraint('uq_vendor_email_settings_vendor_id', 'vendor_email_settings', type_='unique') + op.drop_index('ix_vendor_email_templates_lookup', table_name='vendor_email_templates') + op.create_index(op.f('ix_vendor_email_templates_id'), 'vendor_email_templates', ['id'], unique=False) + op.alter_column('vendor_invoice_settings', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_invoice_settings', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_constraint('vendor_invoice_settings_vendor_id_key', 'vendor_invoice_settings', type_='unique') + op.alter_column('vendor_letzshop_credentials', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_letzshop_credentials', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_constraint('vendor_letzshop_credentials_vendor_id_key', 'vendor_letzshop_credentials', type_='unique') + op.alter_column('vendor_subscriptions', 'created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_subscriptions', 'updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_constraint('vendor_subscriptions_vendor_id_key', 'vendor_subscriptions', type_='unique') + op.drop_constraint('fk_vendor_subscriptions_tier_id', 'vendor_subscriptions', type_='foreignkey') + op.create_foreign_key(None, 'vendor_subscriptions', 'subscription_tiers', ['tier_id'], ['id']) + op.alter_column('vendors', 'storefront_locale', + existing_type=sa.VARCHAR(length=10), + comment=None, + existing_comment='Currency/number formatting locale (NULL = inherit from platform)', + existing_nullable=True) + # ### end Alembic commands ### + + +def downgrade() -> None: + # ### commands auto generated by Alembic - please adjust! ### + op.alter_column('vendors', 'storefront_locale', + existing_type=sa.VARCHAR(length=10), + comment='Currency/number formatting locale (NULL = inherit from platform)', + existing_nullable=True) + op.drop_constraint(None, 'vendor_subscriptions', type_='foreignkey') + op.create_foreign_key('fk_vendor_subscriptions_tier_id', 'vendor_subscriptions', 'subscription_tiers', ['tier_id'], ['id'], ondelete='SET NULL') + op.create_unique_constraint('vendor_subscriptions_vendor_id_key', 'vendor_subscriptions', ['vendor_id']) + op.alter_column('vendor_subscriptions', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_subscriptions', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.create_unique_constraint('vendor_letzshop_credentials_vendor_id_key', 'vendor_letzshop_credentials', ['vendor_id']) + op.alter_column('vendor_letzshop_credentials', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_letzshop_credentials', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.create_unique_constraint('vendor_invoice_settings_vendor_id_key', 'vendor_invoice_settings', ['vendor_id']) + op.alter_column('vendor_invoice_settings', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('vendor_invoice_settings', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_index(op.f('ix_vendor_email_templates_id'), table_name='vendor_email_templates') + op.create_index('ix_vendor_email_templates_lookup', 'vendor_email_templates', ['vendor_id', 'template_code', 'language'], unique=False) + op.create_unique_constraint('uq_vendor_email_settings_vendor_id', 'vendor_email_settings', ['vendor_id']) + op.drop_index(op.f('ix_products_is_digital'), table_name='products') + op.create_index('idx_product_is_digital', 'products', ['is_digital'], unique=False) + op.alter_column('products', 'product_type', + existing_type=sa.VARCHAR(length=20), + nullable=False, + existing_server_default=sa.text("'physical'::character varying")) + op.alter_column('products', 'is_digital', + existing_type=sa.BOOLEAN(), + nullable=False, + existing_server_default=sa.text('false')) + op.alter_column('product_media', 'updated_at', + existing_type=postgresql.TIMESTAMP(), + nullable=True) + op.alter_column('product_media', 'created_at', + existing_type=postgresql.TIMESTAMP(), + nullable=True, + existing_server_default=sa.text('now()')) + op.drop_index(op.f('ix_password_reset_tokens_id'), table_name='password_reset_tokens') + op.create_index('ix_password_reset_tokens_customer_id', 'password_reset_tokens', ['customer_id'], unique=False) + op.alter_column('orders', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('orders', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_items', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_items', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_item_exceptions', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('order_item_exceptions', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('media_files', 'updated_at', + existing_type=postgresql.TIMESTAMP(), + nullable=True) + op.alter_column('media_files', 'created_at', + existing_type=postgresql.TIMESTAMP(), + nullable=True, + existing_server_default=sa.text('now()')) + op.alter_column('letzshop_sync_logs', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_sync_logs', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_fulfillment_queue', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('letzshop_fulfillment_queue', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('invoices', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.alter_column('invoices', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('CURRENT_TIMESTAMP')) + op.drop_index('idx_inv_tx_order', table_name='inventory_transactions') + op.drop_index(op.f('ix_features_minimum_tier_id'), table_name='features') + op.drop_index(op.f('ix_features_id'), table_name='features') + op.alter_column('capacity_snapshots', 'updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('now()')) + op.alter_column('capacity_snapshots', 'created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=False, + existing_server_default=sa.text('now()')) + op.create_unique_constraint('architecture_rules_rule_id_key', 'architecture_rules', ['rule_id']) + op.drop_index(op.f('ix_letzshop_vendor_cache_slug'), table_name='letzshop_vendor_cache') + op.drop_index(op.f('ix_letzshop_vendor_cache_letzshop_id'), table_name='letzshop_vendor_cache') + op.drop_index(op.f('ix_letzshop_vendor_cache_id'), table_name='letzshop_vendor_cache') + op.drop_index(op.f('ix_letzshop_vendor_cache_claimed_by_vendor_id'), table_name='letzshop_vendor_cache') + op.drop_index('idx_vendor_cache_claimed', table_name='letzshop_vendor_cache') + op.drop_index('idx_vendor_cache_city', table_name='letzshop_vendor_cache') + op.drop_index('idx_vendor_cache_active', table_name='letzshop_vendor_cache') + op.drop_table('letzshop_vendor_cache') + # ### end Alembic commands ### diff --git a/app/api/v1/admin/letzshop.py b/app/api/v1/admin/letzshop.py index 62a7fd3e..04b0a0e2 100644 --- a/app/api/v1/admin/letzshop.py +++ b/app/api/v1/admin/letzshop.py @@ -27,6 +27,7 @@ from app.services.letzshop import ( LetzshopClientError, LetzshopCredentialsService, LetzshopOrderService, + LetzshopVendorSyncService, OrderNotFoundError, VendorNotFoundError, ) @@ -34,8 +35,13 @@ from app.tasks.letzshop_tasks import process_historical_import from models.database.user import User from models.schema.letzshop import ( FulfillmentOperationResponse, + LetzshopCachedVendorDetail, + LetzshopCachedVendorDetailResponse, + LetzshopCachedVendorItem, + LetzshopCachedVendorListResponse, LetzshopConnectionTestRequest, LetzshopConnectionTestResponse, + LetzshopCreateVendorFromCacheResponse, LetzshopCredentialsCreate, LetzshopCredentialsResponse, LetzshopCredentialsUpdate, @@ -51,6 +57,9 @@ from models.schema.letzshop import ( LetzshopSuccessResponse, LetzshopSyncTriggerRequest, LetzshopSyncTriggerResponse, + LetzshopVendorDirectoryStats, + LetzshopVendorDirectoryStatsResponse, + LetzshopVendorDirectorySyncResponse, LetzshopVendorListResponse, LetzshopVendorOverview, ) @@ -1272,3 +1281,239 @@ def sync_tracking_for_vendor( message=f"Tracking sync failed: {e}", errors=[str(e)], ) + + +# ============================================================================ +# Vendor Directory (Letzshop Marketplace Vendors) +# ============================================================================ + + +def get_vendor_sync_service(db: Session) -> LetzshopVendorSyncService: + """Get vendor sync service instance.""" + return LetzshopVendorSyncService(db) + + +@router.post("/vendor-directory/sync") +def trigger_vendor_directory_sync( + background_tasks: BackgroundTasks, + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +): + """ + Trigger a sync of the Letzshop vendor directory. + + Fetches all vendors from Letzshop's public GraphQL API and updates + the local cache. This is typically run daily via Celery beat, but + can be triggered manually here. + """ + from app.tasks.celery_tasks.letzshop import sync_vendor_directory + + # Try to dispatch via Celery first + try: + task = sync_vendor_directory.delay() + logger.info( + f"Admin {current_admin.email} triggered vendor directory sync (task={task.id})" + ) + return { + "success": True, + "message": "Vendor directory sync started", + "task_id": task.id, + "mode": "celery", + } + except Exception as e: + # Fall back to background tasks + logger.warning(f"Celery dispatch failed, using background tasks: {e}") + + def run_sync(): + from app.core.database import SessionLocal + + sync_db = SessionLocal() + try: + sync_service = LetzshopVendorSyncService(sync_db) + sync_service.sync_all_vendors() + finally: + sync_db.close() + + background_tasks.add_task(run_sync) + logger.info( + f"Admin {current_admin.email} triggered vendor directory sync (background task)" + ) + return { + "success": True, + "message": "Vendor directory sync started", + "mode": "background_task", + } + + +@router.get( + "/vendor-directory/stats", + response_model=LetzshopVendorDirectoryStatsResponse, +) +def get_vendor_directory_stats( + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +) -> LetzshopVendorDirectoryStatsResponse: + """ + Get statistics about the Letzshop vendor directory cache. + + Returns total, active, claimed, and unclaimed vendor counts. + """ + sync_service = get_vendor_sync_service(db) + stats_data = sync_service.get_sync_stats() + return LetzshopVendorDirectoryStatsResponse( + stats=LetzshopVendorDirectoryStats(**stats_data) + ) + + +@router.get( + "/vendor-directory/vendors", + response_model=LetzshopCachedVendorListResponse, +) +def list_cached_vendors( + search: str | None = Query(None, description="Search by name"), + city: str | None = Query(None, description="Filter by city"), + category: str | None = Query(None, description="Filter by category"), + only_unclaimed: bool = Query(False, description="Only show unclaimed vendors"), + page: int = Query(1, ge=1), + limit: int = Query(20, ge=1, le=100), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +) -> LetzshopCachedVendorListResponse: + """ + List cached Letzshop vendors with search and filtering. + + This returns vendors from the local cache, not directly from Letzshop. + """ + sync_service = get_vendor_sync_service(db) + vendors, total = sync_service.search_cached_vendors( + search=search, + city=city, + category=category, + only_unclaimed=only_unclaimed, + page=page, + limit=limit, + ) + + return LetzshopCachedVendorListResponse( + vendors=[ + LetzshopCachedVendorItem( + id=v.id, + letzshop_id=v.letzshop_id, + slug=v.slug, + name=v.name, + company_name=v.company_name, + email=v.email, + phone=v.phone, + website=v.website, + city=v.city, + categories=v.categories or [], + is_active=v.is_active, + is_claimed=v.is_claimed, + claimed_by_vendor_id=v.claimed_by_vendor_id, + last_synced_at=v.last_synced_at, + letzshop_url=v.letzshop_url, + ) + for v in vendors + ], + total=total, + page=page, + limit=limit, + has_more=(page * limit) < total, + ) + + +@router.get( + "/vendor-directory/vendors/{slug}", + response_model=LetzshopCachedVendorDetailResponse, +) +def get_cached_vendor_detail( + slug: str = Path(..., description="Letzshop vendor slug"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +) -> LetzshopCachedVendorDetailResponse: + """ + Get detailed information about a cached Letzshop vendor. + """ + sync_service = get_vendor_sync_service(db) + vendor = sync_service.get_cached_vendor(slug) + + if not vendor: + raise ResourceNotFoundException("LetzshopVendor", slug) + + return LetzshopCachedVendorDetailResponse( + vendor=LetzshopCachedVendorDetail( + id=vendor.id, + letzshop_id=vendor.letzshop_id, + slug=vendor.slug, + name=vendor.name, + company_name=vendor.company_name, + description_en=vendor.description_en, + description_fr=vendor.description_fr, + description_de=vendor.description_de, + email=vendor.email, + phone=vendor.phone, + fax=vendor.fax, + website=vendor.website, + street=vendor.street, + street_number=vendor.street_number, + city=vendor.city, + zipcode=vendor.zipcode, + country_iso=vendor.country_iso, + latitude=vendor.latitude, + longitude=vendor.longitude, + categories=vendor.categories or [], + background_image_url=vendor.background_image_url, + social_media_links=vendor.social_media_links or [], + opening_hours_en=vendor.opening_hours_en, + opening_hours_fr=vendor.opening_hours_fr, + opening_hours_de=vendor.opening_hours_de, + representative_name=vendor.representative_name, + representative_title=vendor.representative_title, + is_active=vendor.is_active, + is_claimed=vendor.is_claimed, + claimed_by_vendor_id=vendor.claimed_by_vendor_id, + claimed_at=vendor.claimed_at, + last_synced_at=vendor.last_synced_at, + letzshop_url=vendor.letzshop_url, + ) + ) + + +@router.post( + "/vendor-directory/vendors/{slug}/create-vendor", + response_model=LetzshopCreateVendorFromCacheResponse, +) +def create_vendor_from_letzshop( + slug: str = Path(..., description="Letzshop vendor slug"), + company_id: int = Query(..., description="Company ID to create vendor under"), + db: Session = Depends(get_db), + current_admin: User = Depends(get_current_admin_api), +) -> LetzshopCreateVendorFromCacheResponse: + """ + Create a platform vendor from a cached Letzshop vendor. + + This creates a new vendor on the platform using information from the + Letzshop vendor cache. The vendor will be linked to the specified company. + + Args: + slug: The Letzshop vendor slug + company_id: The company ID to create the vendor under + """ + sync_service = get_vendor_sync_service(db) + + try: + vendor_info = sync_service.create_vendor_from_cache(slug, company_id) + + logger.info( + f"Admin {current_admin.email} created vendor {vendor_info['vendor_code']} " + f"from Letzshop vendor {slug}" + ) + + return LetzshopCreateVendorFromCacheResponse( + message=f"Vendor '{vendor_info['name']}' created successfully", + vendor=vendor_info, + letzshop_vendor_slug=slug, + ) + + except ValueError as e: + raise ValidationException(str(e)) diff --git a/app/api/v1/platform/letzshop_vendors.py b/app/api/v1/platform/letzshop_vendors.py index f55df483..fd2cdaad 100644 --- a/app/api/v1/platform/letzshop_vendors.py +++ b/app/api/v1/platform/letzshop_vendors.py @@ -13,11 +13,15 @@ import re from typing import Annotated from fastapi import APIRouter, Depends, Query + +from app.exceptions import ResourceNotFoundException from pydantic import BaseModel from sqlalchemy.orm import Session from app.core.database import get_db +from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService from app.services.platform_signup_service import platform_signup_service +from models.database.letzshop import LetzshopVendorCache router = APIRouter() logger = logging.getLogger(__name__) @@ -34,13 +38,40 @@ class LetzshopVendorInfo(BaseModel): letzshop_id: str | None = None slug: str name: str + company_name: str | None = None description: str | None = None - logo_url: str | None = None - category: str | None = None + email: str | None = None + phone: str | None = None + website: str | None = None + address: str | None = None city: str | None = None + categories: list[str] = [] + background_image_url: str | None = None + social_media_links: list[str] = [] letzshop_url: str is_claimed: bool = False + @classmethod + def from_cache(cls, cache: LetzshopVendorCache, lang: str = "en") -> "LetzshopVendorInfo": + """Create from cache entry.""" + return cls( + letzshop_id=cache.letzshop_id, + slug=cache.slug, + name=cache.name, + company_name=cache.company_name, + description=cache.get_description(lang), + email=cache.email, + phone=cache.phone, + website=cache.website, + address=cache.get_full_address(), + city=cache.city, + categories=cache.categories or [], + background_image_url=cache.background_image_url, + social_media_links=cache.social_media_links or [], + letzshop_url=cache.letzshop_url, + is_claimed=cache.is_claimed, + ) + class LetzshopVendorListResponse(BaseModel): """Paginated list of Letzshop vendors.""" @@ -113,35 +144,42 @@ async def list_letzshop_vendors( search: Annotated[str | None, Query(description="Search by name")] = None, category: Annotated[str | None, Query(description="Filter by category")] = None, city: Annotated[str | None, Query(description="Filter by city")] = None, + only_unclaimed: Annotated[bool, Query(description="Only show unclaimed vendors")] = False, + lang: Annotated[str, Query(description="Language for descriptions")] = "en", page: Annotated[int, Query(ge=1)] = 1, limit: Annotated[int, Query(ge=1, le=50)] = 20, + db: Session = Depends(get_db), ) -> LetzshopVendorListResponse: """ - List Letzshop vendors (placeholder - will fetch from cache/API). + List Letzshop vendors from cached directory. - In production, this would fetch from a cached vendor list - that is periodically synced from Letzshop's public directory. + The cache is periodically synced from Letzshop's public GraphQL API. + Run the sync task manually or wait for scheduled sync if cache is empty. """ - # TODO: Implement actual Letzshop vendor listing - # For now, return placeholder data to allow UI development + sync_service = LetzshopVendorSyncService(db) - # This is placeholder data - in production, we would: - # 1. Query our cached letzshop_vendor_cache table - # 2. Or fetch from Letzshop's public API if available - - # Return empty list for now - the actual data will come from Phase 4 - return LetzshopVendorListResponse( - vendors=[], - total=0, + vendors, total = sync_service.search_cached_vendors( + search=search, + city=city, + category=category, + only_unclaimed=only_unclaimed, page=page, limit=limit, - has_more=False, + ) + + return LetzshopVendorListResponse( + vendors=[LetzshopVendorInfo.from_cache(v, lang) for v in vendors], + total=total, + page=page, + limit=limit, + has_more=(page * limit) < total, ) @router.post("/letzshop-vendors/lookup", response_model=LetzshopLookupResponse) # public async def lookup_letzshop_vendor( request: LetzshopLookupRequest, + lang: Annotated[str, Query(description="Language for descriptions")] = "en", db: Session = Depends(get_db), ) -> LetzshopLookupResponse: """ @@ -149,7 +187,7 @@ async def lookup_letzshop_vendor( This endpoint: 1. Extracts the slug from the provided URL - 2. Attempts to fetch vendor info from Letzshop + 2. Looks up vendor in local cache (or fetches from Letzshop if not cached) 3. Checks if the vendor is already claimed on our platform 4. Returns vendor info for signup pre-fill """ @@ -162,23 +200,25 @@ async def lookup_letzshop_vendor( error="Could not extract vendor slug from URL", ) - # Check if already claimed (using service layer) - is_claimed = platform_signup_service.check_vendor_claimed(db, slug) + sync_service = LetzshopVendorSyncService(db) - # TODO: Fetch actual vendor info from Letzshop (Phase 4) - # For now, return basic info based on the slug - letzshop_url = f"https://letzshop.lu/vendors/{slug}" + # First try cache + cache_entry = sync_service.get_cached_vendor(slug) - vendor_info = LetzshopVendorInfo( - slug=slug, - name=slug.replace("-", " ").title(), # Placeholder name - letzshop_url=letzshop_url, - is_claimed=is_claimed, - ) + # If not in cache, try to fetch from Letzshop + if not cache_entry: + logger.info(f"Vendor {slug} not in cache, fetching from Letzshop...") + cache_entry = sync_service.sync_single_vendor(slug) + + if not cache_entry: + return LetzshopLookupResponse( + found=False, + error="Vendor not found on Letzshop", + ) return LetzshopLookupResponse( found=True, - vendor=vendor_info, + vendor=LetzshopVendorInfo.from_cache(cache_entry, lang), ) except Exception as e: @@ -192,26 +232,40 @@ async def lookup_letzshop_vendor( @router.get("/letzshop-vendors/{slug}", response_model=LetzshopVendorInfo) # public async def get_letzshop_vendor( slug: str, + lang: Annotated[str, Query(description="Language for descriptions")] = "en", db: Session = Depends(get_db), ) -> LetzshopVendorInfo: """ Get a specific Letzshop vendor by slug. - Returns 404 if vendor not found. + Returns 404 if vendor not found in cache or on Letzshop. """ slug = slug.lower() - # Check if claimed (using service layer) - is_claimed = platform_signup_service.check_vendor_claimed(db, slug) + sync_service = LetzshopVendorSyncService(db) - # TODO: Fetch actual vendor info from cache/API (Phase 4) - # For now, return placeholder based on slug + # First try cache + cache_entry = sync_service.get_cached_vendor(slug) - letzshop_url = f"https://letzshop.lu/vendors/{slug}" + # If not in cache, try to fetch from Letzshop + if not cache_entry: + logger.info(f"Vendor {slug} not in cache, fetching from Letzshop...") + cache_entry = sync_service.sync_single_vendor(slug) - return LetzshopVendorInfo( - slug=slug, - name=slug.replace("-", " ").title(), - letzshop_url=letzshop_url, - is_claimed=is_claimed, - ) + if not cache_entry: + raise ResourceNotFoundException("LetzshopVendor", slug) + + return LetzshopVendorInfo.from_cache(cache_entry, lang) + + +@router.get("/letzshop-vendors-stats") # public +async def get_letzshop_vendor_stats( + db: Session = Depends(get_db), +) -> dict: + """ + Get statistics about the Letzshop vendor cache. + + Returns total, active, claimed, and unclaimed vendor counts. + """ + sync_service = LetzshopVendorSyncService(db) + return sync_service.get_sync_stats() diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py index a0e0c730..0c4f0b70 100644 --- a/app/routes/admin_pages.py +++ b/app/routes/admin_pages.py @@ -49,6 +49,7 @@ from app.api.deps import ( get_current_admin_optional, get_db, ) +from app.core.config import settings from models.database.user import User router = APIRouter() @@ -660,6 +661,7 @@ async def admin_background_tasks_page( { "request": request, "user": current_user, + "flower_url": settings.flower_url, }, ) @@ -760,6 +762,38 @@ async def admin_letzshop_product_detail_page( ) +# ============================================================================ +# LETZSHOP VENDOR DIRECTORY +# ============================================================================ + + +@router.get( + "/letzshop/vendor-directory", + response_class=HTMLResponse, + include_in_schema=False, +) +async def admin_letzshop_vendor_directory_page( + request: Request, + current_user: User = Depends(get_current_admin_from_cookie_or_header), + db: Session = Depends(get_db), +): + """ + Render Letzshop vendor directory management page. + + Allows admins to: + - View cached Letzshop vendors + - Trigger manual sync from Letzshop API + - Create platform vendors from cached Letzshop vendors + """ + return templates.TemplateResponse( + "admin/letzshop-vendor-directory.html", + { + "request": request, + "user": current_user, + }, + ) + + # ============================================================================ # PRODUCT CATALOG ROUTES # ============================================================================ diff --git a/app/services/letzshop/__init__.py b/app/services/letzshop/__init__.py index a73f578f..c7e8af06 100644 --- a/app/services/letzshop/__init__.py +++ b/app/services/letzshop/__init__.py @@ -7,6 +7,7 @@ Provides: - Credential management service - Order import service - Fulfillment sync service +- Vendor directory sync service """ from .client_service import ( @@ -26,6 +27,10 @@ from .order_service import ( OrderNotFoundError, VendorNotFoundError, ) +from .vendor_sync_service import ( + LetzshopVendorSyncService, + get_vendor_sync_service, +) __all__ = [ # Client @@ -42,4 +47,7 @@ __all__ = [ "LetzshopOrderService", "OrderNotFoundError", "VendorNotFoundError", + # Vendor Sync Service + "LetzshopVendorSyncService", + "get_vendor_sync_service", ] diff --git a/app/services/letzshop/client_service.py b/app/services/letzshop/client_service.py index e2363daa..d58957aa 100644 --- a/app/services/letzshop/client_service.py +++ b/app/services/letzshop/client_service.py @@ -366,6 +366,83 @@ query GetShipmentsPaginated($first: Int!, $after: String) {{ }} """ +# ============================================================================ +# GraphQL Queries - Vendor Directory (Public) +# ============================================================================ + +QUERY_VENDORS_PAGINATED = """ +query GetVendorsPaginated($first: Int!, $after: String) { + vendors(first: $first, after: $after) { + pageInfo { + hasNextPage + endCursor + } + totalCount + nodes { + id + slug + name + active + companyName + legalName + email + phone + fax + homepage + description { en fr de } + location { + street + number + city + zipcode + country { iso } + } + lat + lng + vendorCategories { name { en fr de } } + backgroundImage { url } + socialMediaLinks { url } + openingHours { en fr de } + representative + representativeTitle + } + } +} +""" + +QUERY_VENDOR_BY_SLUG = """ +query GetVendorBySlug($slug: String!) { + vendor(slug: $slug) { + id + slug + name + active + companyName + legalName + email + phone + fax + homepage + description { en fr de } + location { + street + number + city + zipcode + country { iso } + } + lat + lng + vendorCategories { name { en fr de } } + backgroundImage { url } + socialMediaLinks { url } + openingHours { en fr de } + representative + representativeTitle + } +} +""" + # ============================================================================ # GraphQL Mutations # ============================================================================ @@ -475,6 +552,74 @@ class LetzshopClient: self.close() return False + def _execute_public( + self, + query: str, + variables: dict[str, Any] | None = None, + ) -> dict[str, Any]: + """ + Execute a GraphQL query without authentication (for public queries). + + Args: + query: The GraphQL query string. + variables: Optional variables for the query. + + Returns: + The response data from the API. + + Raises: + LetzshopAPIError: If the API returns an error. + LetzshopConnectionError: If the request fails. + """ + payload = {"query": query} + if variables: + payload["variables"] = variables + + logger.debug(f"Executing public GraphQL request to {self.endpoint}") + + try: + # Use a simple request without Authorization header + response = requests.post( + self.endpoint, + json=payload, + headers={"Content-Type": "application/json"}, + timeout=self.timeout, + ) + except requests.exceptions.Timeout as e: + raise LetzshopConnectionError(f"Request timed out: {e}") from e + except requests.exceptions.ConnectionError as e: + raise LetzshopConnectionError(f"Connection failed: {e}") from e + except requests.exceptions.RequestException as e: + raise LetzshopConnectionError(f"Request failed: {e}") from e + + # Handle HTTP-level errors + if response.status_code >= 500: + raise LetzshopAPIError( + f"Letzshop server error (HTTP {response.status_code})", + response_data={"status_code": response.status_code}, + ) + + # Parse JSON response + try: + data = response.json() + except ValueError as e: + raise LetzshopAPIError( + f"Invalid JSON response: {response.text[:200]}" + ) from e + + logger.debug(f"GraphQL response: {data}") + + # Handle GraphQL errors + if "errors" in data: + errors = data["errors"] + error_messages = [e.get("message", str(e)) for e in errors] + raise LetzshopAPIError( + f"GraphQL errors: {'; '.join(error_messages)}", + response_data=data, + ) + + return data.get("data", {}) + def _execute( self, query: str, @@ -771,3 +916,100 @@ class LetzshopClient: data = self._execute(MUTATION_SET_SHIPMENT_TRACKING, variables) return data.get("setShipmentTracking", {}) + + # ======================================================================== + # Vendor Directory Queries (Public - No Auth Required) + # ======================================================================== + + def get_all_vendors_paginated( + self, + page_size: int = 50, + max_pages: int | None = None, + progress_callback: Callable[[int, int, int], None] | None = None, + ) -> list[dict[str, Any]]: + """ + Fetch all vendors from Letzshop marketplace directory. + + This uses the public GraphQL API (no authentication required). + + Args: + page_size: Number of vendors per page (default 50). + max_pages: Maximum number of pages to fetch (None = all). + progress_callback: Optional callback(page, total_fetched, total_count) + for progress updates. + + Returns: + List of all vendor data dictionaries. + """ + all_vendors = [] + cursor = None + page = 0 + total_count = None + + while True: + page += 1 + variables = {"first": page_size} + if cursor: + variables["after"] = cursor + + logger.info(f"Fetching vendors page {page} (cursor: {cursor})") + + try: + # Use public endpoint (no authentication required) + data = self._execute_public(QUERY_VENDORS_PAGINATED, variables) + except LetzshopAPIError as e: + logger.error(f"Error fetching vendors page {page}: {e}") + break + + vendors_data = data.get("vendors", {}) + nodes = vendors_data.get("nodes", []) + page_info = vendors_data.get("pageInfo", {}) + + if total_count is None: + total_count = vendors_data.get("totalCount", 0) + logger.info(f"Total vendors in Letzshop: {total_count}") + + all_vendors.extend(nodes) + + if progress_callback: + progress_callback(page, len(all_vendors), total_count) + + logger.info( + f"Page {page}: fetched {len(nodes)} vendors, " + f"total: {len(all_vendors)}/{total_count}" + ) + + # Check if there are more pages + if not page_info.get("hasNextPage"): + logger.info(f"Reached last page. Total vendors: {len(all_vendors)}") + break + + cursor = page_info.get("endCursor") + + # Check max pages limit + if max_pages and page >= max_pages: + logger.info( + f"Reached max pages limit ({max_pages}). " + f"Total vendors: {len(all_vendors)}" + ) + break + + return all_vendors + + def get_vendor_by_slug(self, slug: str) -> dict[str, Any] | None: + """ + Get a single vendor by their URL slug. + + Args: + slug: The vendor's URL slug (e.g., "nicks-diecast-corner"). + + Returns: + Vendor data dictionary or None if not found. + """ + try: + # Use public endpoint (no authentication required) + data = self._execute_public(QUERY_VENDOR_BY_SLUG, {"slug": slug}) + return data.get("vendor") + except LetzshopAPIError as e: + logger.warning(f"Vendor not found with slug '{slug}': {e}") + return None diff --git a/app/services/letzshop/vendor_sync_service.py b/app/services/letzshop/vendor_sync_service.py new file mode 100644 index 00000000..7fa963bb --- /dev/null +++ b/app/services/letzshop/vendor_sync_service.py @@ -0,0 +1,521 @@ +# app/services/letzshop/vendor_sync_service.py +""" +Service for syncing Letzshop vendor directory to local cache. + +Fetches vendor data from Letzshop's public GraphQL API and stores it +in the letzshop_vendor_cache table for fast lookups during signup. +""" + +import logging +from datetime import UTC, datetime +from typing import Any, Callable + +from sqlalchemy import func +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.orm import Session + +from app.services.letzshop.client_service import LetzshopClient +from models.database.letzshop import LetzshopVendorCache + +logger = logging.getLogger(__name__) + + +class LetzshopVendorSyncService: + """ + Service for syncing Letzshop vendor directory. + + Usage: + service = LetzshopVendorSyncService(db) + stats = service.sync_all_vendors() + """ + + def __init__(self, db: Session): + """Initialize the sync service.""" + self.db = db + + def sync_all_vendors( + self, + progress_callback: Callable[[int, int, int], None] | None = None, + max_pages: int | None = None, + ) -> dict[str, Any]: + """ + Sync all vendors from Letzshop to local cache. + + Args: + progress_callback: Optional callback(page, fetched, total) for progress. + + Returns: + Dictionary with sync statistics. + """ + stats = { + "started_at": datetime.now(UTC), + "total_fetched": 0, + "created": 0, + "updated": 0, + "errors": 0, + "error_details": [], + } + + logger.info("Starting Letzshop vendor directory sync...") + + # Create client (no API key needed for public vendor data) + client = LetzshopClient(api_key="") + + try: + # Fetch all vendors + vendors = client.get_all_vendors_paginated( + page_size=50, + max_pages=max_pages, + progress_callback=progress_callback, + ) + + stats["total_fetched"] = len(vendors) + logger.info(f"Fetched {len(vendors)} vendors from Letzshop") + + # Process each vendor + for vendor_data in vendors: + try: + result = self._upsert_vendor(vendor_data) + if result == "created": + stats["created"] += 1 + elif result == "updated": + stats["updated"] += 1 + except Exception as e: + stats["errors"] += 1 + error_info = { + "vendor_id": vendor_data.get("id"), + "slug": vendor_data.get("slug"), + "error": str(e), + } + stats["error_details"].append(error_info) + logger.error(f"Error processing vendor {vendor_data.get('slug')}: {e}") + + # Commit all changes + self.db.commit() + logger.info( + f"Sync complete: {stats['created']} created, " + f"{stats['updated']} updated, {stats['errors']} errors" + ) + + except Exception as e: + self.db.rollback() + logger.error(f"Vendor sync failed: {e}") + stats["error"] = str(e) + raise + + finally: + client.close() + + stats["completed_at"] = datetime.now(UTC) + stats["duration_seconds"] = ( + stats["completed_at"] - stats["started_at"] + ).total_seconds() + + return stats + + def _upsert_vendor(self, vendor_data: dict[str, Any]) -> str: + """ + Insert or update a vendor in the cache. + + Args: + vendor_data: Raw vendor data from Letzshop API. + + Returns: + "created" or "updated" indicating the operation performed. + """ + letzshop_id = vendor_data.get("id") + slug = vendor_data.get("slug") + + if not letzshop_id or not slug: + raise ValueError("Vendor missing required id or slug") + + # Parse the vendor data + parsed = self._parse_vendor_data(vendor_data) + + # Check if exists + existing = ( + self.db.query(LetzshopVendorCache) + .filter(LetzshopVendorCache.letzshop_id == letzshop_id) + .first() + ) + + if existing: + # Update existing record (preserve claimed status) + for key, value in parsed.items(): + if key not in ("claimed_by_vendor_id", "claimed_at"): + setattr(existing, key, value) + existing.last_synced_at = datetime.now(UTC) + return "updated" + else: + # Create new record + cache_entry = LetzshopVendorCache( + **parsed, + last_synced_at=datetime.now(UTC), + ) + self.db.add(cache_entry) + return "created" + + def _parse_vendor_data(self, data: dict[str, Any]) -> dict[str, Any]: + """ + Parse raw Letzshop vendor data into cache model fields. + + Args: + data: Raw vendor data from Letzshop API. + + Returns: + Dictionary of parsed fields for LetzshopVendorCache. + """ + # Extract location + location = data.get("location") or {} + country = location.get("country") or {} + + # Extract descriptions + description = data.get("description") or {} + + # Extract opening hours + opening_hours = data.get("openingHours") or {} + + # Extract categories (list of translated name objects) + categories = [] + for cat in data.get("vendorCategories") or []: + cat_name = cat.get("name") or {} + # Prefer English, fallback to French or German + name = cat_name.get("en") or cat_name.get("fr") or cat_name.get("de") + if name: + categories.append(name) + + # Extract social media URLs + social_links = [] + for link in data.get("socialMediaLinks") or []: + url = link.get("url") + if url: + social_links.append(url) + + # Extract background image + bg_image = data.get("backgroundImage") or {} + + return { + "letzshop_id": data.get("id"), + "slug": data.get("slug"), + "name": data.get("name"), + "company_name": data.get("companyName") or data.get("legalName"), + "is_active": data.get("active", True), + # Descriptions + "description_en": description.get("en"), + "description_fr": description.get("fr"), + "description_de": description.get("de"), + # Contact + "email": data.get("email"), + "phone": data.get("phone"), + "fax": data.get("fax"), + "website": data.get("homepage"), + # Location + "street": location.get("street"), + "street_number": location.get("number"), + "city": location.get("city"), + "zipcode": location.get("zipcode"), + "country_iso": country.get("iso", "LU"), + "latitude": str(data.get("lat")) if data.get("lat") else None, + "longitude": str(data.get("lng")) if data.get("lng") else None, + # Categories and media + "categories": categories, + "background_image_url": bg_image.get("url"), + "social_media_links": social_links, + # Opening hours + "opening_hours_en": opening_hours.get("en"), + "opening_hours_fr": opening_hours.get("fr"), + "opening_hours_de": opening_hours.get("de"), + # Representative + "representative_name": data.get("representative"), + "representative_title": data.get("representativeTitle"), + # Raw data for reference + "raw_data": data, + } + + def sync_single_vendor(self, slug: str) -> LetzshopVendorCache | None: + """ + Sync a single vendor by slug. + + Useful for on-demand refresh when a user looks up a vendor. + + Args: + slug: The vendor's URL slug. + + Returns: + The updated/created cache entry, or None if not found. + """ + client = LetzshopClient(api_key="") + + try: + vendor_data = client.get_vendor_by_slug(slug) + + if not vendor_data: + logger.warning(f"Vendor not found on Letzshop: {slug}") + return None + + result = self._upsert_vendor(vendor_data) + self.db.commit() + + logger.info(f"Single vendor sync: {slug} ({result})") + + return ( + self.db.query(LetzshopVendorCache) + .filter(LetzshopVendorCache.slug == slug) + .first() + ) + + finally: + client.close() + + def get_cached_vendor(self, slug: str) -> LetzshopVendorCache | None: + """ + Get a vendor from cache by slug. + + Args: + slug: The vendor's URL slug. + + Returns: + Cache entry or None if not found. + """ + return ( + self.db.query(LetzshopVendorCache) + .filter(LetzshopVendorCache.slug == slug.lower()) + .first() + ) + + def search_cached_vendors( + self, + search: str | None = None, + city: str | None = None, + category: str | None = None, + only_unclaimed: bool = False, + page: int = 1, + limit: int = 20, + ) -> tuple[list[LetzshopVendorCache], int]: + """ + Search cached vendors with filters. + + Args: + search: Search term for name. + city: Filter by city. + category: Filter by category. + only_unclaimed: Only return vendors not yet claimed. + page: Page number (1-indexed). + limit: Items per page. + + Returns: + Tuple of (vendors list, total count). + """ + query = self.db.query(LetzshopVendorCache).filter( + LetzshopVendorCache.is_active == True # noqa: E712 + ) + + if search: + search_term = f"%{search.lower()}%" + query = query.filter( + func.lower(LetzshopVendorCache.name).like(search_term) + ) + + if city: + query = query.filter( + func.lower(LetzshopVendorCache.city) == city.lower() + ) + + if category: + # Search in JSON array + query = query.filter( + LetzshopVendorCache.categories.contains([category]) + ) + + if only_unclaimed: + query = query.filter( + LetzshopVendorCache.claimed_by_vendor_id.is_(None) + ) + + # Get total count + total = query.count() + + # Apply pagination + offset = (page - 1) * limit + vendors = ( + query.order_by(LetzshopVendorCache.name) + .offset(offset) + .limit(limit) + .all() + ) + + return vendors, total + + def get_sync_stats(self) -> dict[str, Any]: + """ + Get statistics about the vendor cache. + + Returns: + Dictionary with cache statistics. + """ + total = self.db.query(LetzshopVendorCache).count() + active = ( + self.db.query(LetzshopVendorCache) + .filter(LetzshopVendorCache.is_active == True) # noqa: E712 + .count() + ) + claimed = ( + self.db.query(LetzshopVendorCache) + .filter(LetzshopVendorCache.claimed_by_vendor_id.isnot(None)) + .count() + ) + + # Get last sync time + last_synced = ( + self.db.query(func.max(LetzshopVendorCache.last_synced_at)).scalar() + ) + + # Get unique cities + cities = ( + self.db.query(LetzshopVendorCache.city) + .filter(LetzshopVendorCache.city.isnot(None)) + .distinct() + .count() + ) + + return { + "total_vendors": total, + "active_vendors": active, + "claimed_vendors": claimed, + "unclaimed_vendors": active - claimed, + "unique_cities": cities, + "last_synced_at": last_synced.isoformat() if last_synced else None, + } + + def mark_vendor_claimed( + self, + letzshop_slug: str, + vendor_id: int, + ) -> bool: + """ + Mark a Letzshop vendor as claimed by a platform vendor. + + Args: + letzshop_slug: The Letzshop vendor slug. + vendor_id: The platform vendor ID that claimed it. + + Returns: + True if successful, False if vendor not found. + """ + cache_entry = self.get_cached_vendor(letzshop_slug) + + if not cache_entry: + return False + + cache_entry.claimed_by_vendor_id = vendor_id + cache_entry.claimed_at = datetime.now(UTC) + self.db.commit() + + logger.info(f"Vendor {letzshop_slug} claimed by vendor_id={vendor_id}") + return True + + def create_vendor_from_cache( + self, + letzshop_slug: str, + company_id: int, + ) -> dict[str, Any]: + """ + Create a platform vendor from a cached Letzshop vendor. + + Args: + letzshop_slug: The Letzshop vendor slug. + company_id: The company ID to create the vendor under. + + Returns: + Dictionary with created vendor info. + + Raises: + ValueError: If vendor not found, already claimed, or company not found. + """ + import random + + from sqlalchemy import func + + from app.services.admin_service import admin_service + from models.database.company import Company + from models.database.vendor import Vendor + from models.schema.vendor import VendorCreate + + # Get cache entry + cache_entry = self.get_cached_vendor(letzshop_slug) + if not cache_entry: + raise ValueError(f"Letzshop vendor '{letzshop_slug}' not found in cache") + + if cache_entry.is_claimed: + raise ValueError( + f"Letzshop vendor '{cache_entry.name}' is already claimed " + f"by vendor ID {cache_entry.claimed_by_vendor_id}" + ) + + # Verify company exists + company = self.db.query(Company).filter(Company.id == company_id).first() + if not company: + raise ValueError(f"Company with ID {company_id} not found") + + # Generate vendor code from slug + vendor_code = letzshop_slug.upper().replace("-", "_")[:20] + + # Check if vendor code already exists + existing = ( + self.db.query(Vendor) + .filter(func.upper(Vendor.vendor_code) == vendor_code) + .first() + ) + if existing: + vendor_code = f"{vendor_code[:16]}_{random.randint(100, 999)}" + + # Generate subdomain from slug + subdomain = letzshop_slug.lower().replace("_", "-")[:30] + existing_subdomain = ( + self.db.query(Vendor) + .filter(func.lower(Vendor.subdomain) == subdomain) + .first() + ) + if existing_subdomain: + subdomain = f"{subdomain[:26]}-{random.randint(100, 999)}" + + # Create vendor data from cache + address = f"{cache_entry.street or ''} {cache_entry.street_number or ''}".strip() + vendor_data = VendorCreate( + name=cache_entry.name, + vendor_code=vendor_code, + subdomain=subdomain, + company_id=company_id, + email=cache_entry.email or company.email, + phone=cache_entry.phone, + description=cache_entry.description_en or cache_entry.description_fr or "", + city=cache_entry.city, + country=cache_entry.country_iso or "LU", + website=cache_entry.website, + address_line_1=address or None, + postal_code=cache_entry.zipcode, + ) + + # Create vendor + vendor = admin_service.create_vendor(self.db, vendor_data) + + # Mark the Letzshop vendor as claimed (commits internally) # noqa: SVC-006 + self.mark_vendor_claimed(letzshop_slug, vendor.id) + + logger.info( + f"Created vendor {vendor.vendor_code} from Letzshop vendor {letzshop_slug}" + ) + + return { + "id": vendor.id, + "vendor_code": vendor.vendor_code, + "name": vendor.name, + "subdomain": vendor.subdomain, + "company_id": vendor.company_id, + } + + +# Singleton-style function for easy access +def get_vendor_sync_service(db: Session) -> LetzshopVendorSyncService: + """Get a vendor sync service instance.""" + return LetzshopVendorSyncService(db) diff --git a/app/tasks/celery_tasks/letzshop.py b/app/tasks/celery_tasks/letzshop.py index 2ba2ccdf..c544d9d9 100644 --- a/app/tasks/celery_tasks/letzshop.py +++ b/app/tasks/celery_tasks/letzshop.py @@ -1,19 +1,22 @@ # app/tasks/celery_tasks/letzshop.py """ -Celery tasks for Letzshop historical order imports. +Celery tasks for Letzshop integration. -Wraps the existing process_historical_import function for Celery execution. +Includes: +- Historical order imports +- Vendor directory sync """ import logging from datetime import UTC, datetime -from typing import Callable +from typing import Any, Callable from app.core.celery_config import celery_app from app.services.admin_notification_service import admin_notification_service from app.services.letzshop import LetzshopClientError from app.services.letzshop.credentials_service import LetzshopCredentialsService from app.services.letzshop.order_service import LetzshopOrderService +from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService from app.tasks.celery_tasks.base import DatabaseTask from models.database.letzshop import LetzshopHistoricalImportJob @@ -270,3 +273,78 @@ def process_historical_import(self, job_id: int, vendor_id: int): db.commit() raise # Re-raise for Celery retry + + +# ============================================================================= +# Vendor Directory Sync +# ============================================================================= + + +@celery_app.task( + bind=True, + base=DatabaseTask, + name="app.tasks.celery_tasks.letzshop.sync_vendor_directory", + max_retries=2, + default_retry_delay=300, + autoretry_for=(Exception,), + retry_backoff=True, +) +def sync_vendor_directory(self) -> dict[str, Any]: + """ + Celery task to sync Letzshop vendor directory. + + Fetches all vendors from Letzshop's public GraphQL API and updates + the local letzshop_vendor_cache table. + + This task should be scheduled to run periodically (e.g., daily) + via Celery beat. + + Returns: + dict: Sync statistics including created, updated, and error counts. + """ + with self.get_db() as db: + try: + logger.info("Starting Letzshop vendor directory sync...") + + sync_service = LetzshopVendorSyncService(db) + + def progress_callback(page: int, fetched: int, total: int): + """Log progress during sync.""" + logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors") + + stats = sync_service.sync_all_vendors(progress_callback=progress_callback) + + logger.info( + f"Vendor directory sync completed: " + f"{stats.get('created', 0)} created, " + f"{stats.get('updated', 0)} updated, " + f"{stats.get('errors', 0)} errors" + ) + + # Send admin notification if there were errors + if stats.get("errors", 0) > 0: + admin_notification_service.notify_system_info( + db=db, + title="Letzshop Vendor Sync Completed with Errors", + message=( + f"Synced {stats.get('total_fetched', 0)} vendors. " + f"Errors: {stats.get('errors', 0)}" + ), + details=stats, + ) + db.commit() + + return stats + + except Exception as e: + logger.error(f"Vendor directory sync failed: {e}", exc_info=True) + + # Notify admins of failure + admin_notification_service.notify_critical_error( + db=db, + error_type="Vendor Directory Sync", + error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}", + details={"error": str(e)}, + ) + db.commit() + raise # Re-raise for Celery retry diff --git a/app/tasks/letzshop_tasks.py b/app/tasks/letzshop_tasks.py index e3e23424..45ed7b10 100644 --- a/app/tasks/letzshop_tasks.py +++ b/app/tasks/letzshop_tasks.py @@ -1,15 +1,16 @@ # app/tasks/letzshop_tasks.py -"""Background tasks for Letzshop historical order imports.""" +"""Background tasks for Letzshop integration.""" import logging from datetime import UTC, datetime -from typing import Callable +from typing import Any, Callable from app.core.database import SessionLocal from app.services.admin_notification_service import admin_notification_service from app.services.letzshop import LetzshopClientError from app.services.letzshop.credentials_service import LetzshopCredentialsService from app.services.letzshop.order_service import LetzshopOrderService +from app.services.letzshop.vendor_sync_service import LetzshopVendorSyncService from models.database.letzshop import LetzshopHistoricalImportJob logger = logging.getLogger(__name__) @@ -262,3 +263,80 @@ def process_historical_import(job_id: int, vendor_id: int): db.close() except Exception as close_error: logger.error(f"Job {job_id}: Error closing database session: {close_error}") + + +# ============================================================================= +# Vendor Directory Sync +# ============================================================================= + + +def sync_letzshop_vendor_directory() -> dict[str, Any]: + """ + Sync Letzshop vendor directory to local cache. + + This task fetches all vendors from Letzshop's public GraphQL API + and updates the local letzshop_vendor_cache table. + + Should be run periodically (e.g., daily) via Celery beat. + + Returns: + Dictionary with sync statistics. + """ + db = SessionLocal() + stats = {} + + try: + logger.info("Starting Letzshop vendor directory sync task...") + + sync_service = LetzshopVendorSyncService(db) + + def progress_callback(page: int, fetched: int, total: int): + """Log progress during sync.""" + logger.info(f"Vendor sync progress: page {page}, {fetched}/{total} vendors") + + stats = sync_service.sync_all_vendors(progress_callback=progress_callback) + + logger.info( + f"Vendor directory sync completed: " + f"{stats.get('created', 0)} created, " + f"{stats.get('updated', 0)} updated, " + f"{stats.get('errors', 0)} errors" + ) + + # Send admin notification if there were errors + if stats.get("errors", 0) > 0: + admin_notification_service.notify_system_info( + db=db, + title="Letzshop Vendor Sync Completed with Errors", + message=( + f"Synced {stats.get('total_fetched', 0)} vendors. " + f"Errors: {stats.get('errors', 0)}" + ), + details=stats, + ) + + return stats + + except Exception as e: + logger.error(f"Vendor directory sync failed: {e}", exc_info=True) + + # Notify admins of failure + try: + admin_notification_service.notify_critical_error( + db=db, + error_type="Vendor Directory Sync", + error_message=f"Failed to sync Letzshop vendor directory: {str(e)[:200]}", + details={"error": str(e)}, + ) + db.commit() + except Exception: + pass + + raise + + finally: + if hasattr(db, "close") and callable(db.close): + try: + db.close() + except Exception as close_error: + logger.error(f"Error closing database session: {close_error}") diff --git a/app/templates/admin/letzshop-vendor-directory.html b/app/templates/admin/letzshop-vendor-directory.html new file mode 100644 index 00000000..28e2554f --- /dev/null +++ b/app/templates/admin/letzshop-vendor-directory.html @@ -0,0 +1,430 @@ +{# app/templates/admin/letzshop-vendor-directory.html #} +{% extends "admin/base.html" %} +{% from 'shared/macros/alerts.html' import alert_dynamic, error_state %} +{% from 'shared/macros/headers.html' import page_header_flex, refresh_button %} +{% from 'shared/macros/pagination.html' import pagination_controls %} + +{% block title %}Letzshop Vendor Directory{% endblock %} +{% block alpine_data %}letzshopVendorDirectory(){% endblock %} + +{% block content %} + +{% call page_header_flex(title='Letzshop Vendor Directory', subtitle='Browse and import vendors from Letzshop marketplace') %} +
Total Vendors
+ +Active
+ +Claimed
+ +Unclaimed
+ ++ Last sync: +
++ Click "Sync from Letzshop" to import vendors. + Try adjusting your filters. +
+| Vendor | +Contact | +Location | +Categories | +Status | +Actions | +
|---|---|---|---|---|---|
|
+
+
+
+
+
+
+
+
+
+ |
+ + + + | ++ + | +
+
+
+
+
+ +
+
+ |
+ + + + Claimed + + + Available + + | +
+
+
+
+
+
+
+
+ |
+
+ Create a platform vendor from +
+ + +The vendor will be created under this company
+