+
+
+ IN
+
+ WRONG (raw output - Python syntax not valid JS):
languages: {{ vendor.storefront_languages }}
-
+
- WRONG (tojson without safe):
- languages: {{ vendor.storefront_languages|tojson }}
-
-
- RIGHT:
- languages: {{ vendor.storefront_languages|tojson|safe }}
-
+ WRONG (|safe in HTML attributes - breaks attribute parsing):
+
+
pattern:
file_pattern: "app/templates/**/*.html"
required_with_jinja_array:
- - "|tojson|safe"
+ - "|tojson"
- id: "LANG-004"
name: "Language selector function must be exported to window"
diff --git a/app/api/deps.py b/app/api/deps.py
index 1634d9b7..60009e20 100644
--- a/app/api/deps.py
+++ b/app/api/deps.py
@@ -45,7 +45,6 @@ from app.exceptions import (
InsufficientVendorPermissionsException,
InvalidTokenException,
UnauthorizedVendorAccessException,
- VendorAccessDeniedException,
VendorNotFoundException,
VendorOwnerOnlyException,
)
@@ -306,15 +305,15 @@ def get_current_vendor_api(
# Require vendor context in token
if not hasattr(user, "token_vendor_id"):
- raise InvalidTokenException("Token missing vendor information. Please login again.")
+ raise InvalidTokenException(
+ "Token missing vendor information. Please login again."
+ )
vendor_id = user.token_vendor_id
# Verify user still has access to this vendor
if not user.is_member_of(vendor_id):
- logger.warning(
- f"User {user.username} lost access to vendor_id={vendor_id}"
- )
+ logger.warning(f"User {user.username} lost access to vendor_id={vendor_id}")
raise InsufficientPermissionsException(
"Access to vendor has been revoked. Please login again."
)
@@ -605,7 +604,9 @@ def require_vendor_permission(permission: str):
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
- raise InvalidTokenException("Token missing vendor information. Please login again.")
+ raise InvalidTokenException(
+ "Token missing vendor information. Please login again."
+ )
vendor_id = current_user.token_vendor_id
@@ -649,7 +650,9 @@ def require_vendor_owner(
"""
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
- raise InvalidTokenException("Token missing vendor information. Please login again.")
+ raise InvalidTokenException(
+ "Token missing vendor information. Please login again."
+ )
vendor_id = current_user.token_vendor_id
@@ -695,7 +698,9 @@ def require_any_vendor_permission(*permissions: str):
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
- raise InvalidTokenException("Token missing vendor information. Please login again.")
+ raise InvalidTokenException(
+ "Token missing vendor information. Please login again."
+ )
vendor_id = current_user.token_vendor_id
@@ -748,7 +753,9 @@ def require_all_vendor_permissions(*permissions: str):
) -> User:
# Get vendor ID from JWT token
if not hasattr(current_user, "token_vendor_id"):
- raise InvalidTokenException("Token missing vendor information. Please login again.")
+ raise InvalidTokenException(
+ "Token missing vendor information. Please login again."
+ )
vendor_id = current_user.token_vendor_id
diff --git a/app/api/v1/admin/__init__.py b/app/api/v1/admin/__init__.py
index a319ddf7..4a38b7bd 100644
--- a/app/api/v1/admin/__init__.py
+++ b/app/api/v1/admin/__init__.py
@@ -151,9 +151,7 @@ router.include_router(
)
# Include test runner endpoints
-router.include_router(
- tests.router, prefix="/tests", tags=["admin-tests"]
-)
+router.include_router(tests.router, prefix="/tests", tags=["admin-tests"])
# Export the router
__all__ = ["router"]
diff --git a/app/api/v1/admin/background_tasks.py b/app/api/v1/admin/background_tasks.py
index 19efa712..c059906c 100644
--- a/app/api/v1/admin/background_tasks.py
+++ b/app/api/v1/admin/background_tasks.py
@@ -63,7 +63,9 @@ def _convert_import_to_response(job) -> BackgroundTaskResponse:
started_at=job.started_at.isoformat() if job.started_at else None,
completed_at=job.completed_at.isoformat() if job.completed_at else None,
duration_seconds=duration,
- description=f"Import from {job.marketplace}: {job.source_url[:50]}..." if len(job.source_url) > 50 else f"Import from {job.marketplace}: {job.source_url}",
+ description=f"Import from {job.marketplace}: {job.source_url[:50]}..."
+ if len(job.source_url) > 50
+ else f"Import from {job.marketplace}: {job.source_url}",
triggered_by=job.user.username if job.user else None,
error_message=job.error_message,
details={
@@ -108,7 +110,9 @@ def _convert_test_run_to_response(run) -> BackgroundTaskResponse:
@router.get("/tasks", response_model=list[BackgroundTaskResponse])
async def list_background_tasks(
status: str | None = Query(None, description="Filter by status"),
- task_type: str | None = Query(None, description="Filter by type (import, test_run)"),
+ task_type: str | None = Query(
+ None, description="Filter by type (import, test_run)"
+ ),
limit: int = Query(50, ge=1, le=200),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
@@ -122,12 +126,16 @@ async def list_background_tasks(
# Get import jobs
if task_type is None or task_type == "import":
- import_jobs = background_tasks_service.get_import_jobs(db, status=status, limit=limit)
+ import_jobs = background_tasks_service.get_import_jobs(
+ db, status=status, limit=limit
+ )
tasks.extend([_convert_import_to_response(job) for job in import_jobs])
# Get test runs
if task_type is None or task_type == "test_run":
- test_runs = background_tasks_service.get_test_runs(db, status=status, limit=limit)
+ test_runs = background_tasks_service.get_test_runs(
+ db, status=status, limit=limit
+ )
tasks.extend([_convert_test_run_to_response(run) for run in test_runs])
# Sort by start time (most recent first)
diff --git a/app/api/v1/admin/code_quality.py b/app/api/v1/admin/code_quality.py
index 7ba1bb03..1d407f86 100644
--- a/app/api/v1/admin/code_quality.py
+++ b/app/api/v1/admin/code_quality.py
@@ -329,9 +329,7 @@ async def assign_violation(
"user_id": assignment.user_id,
"assigned_at": assignment.assigned_at.isoformat(),
"assigned_by": assignment.assigned_by,
- "due_date": (
- assignment.due_date.isoformat() if assignment.due_date else None
- ),
+ "due_date": (assignment.due_date.isoformat() if assignment.due_date else None),
"priority": assignment.priority,
}
diff --git a/app/api/v1/admin/companies.py b/app/api/v1/admin/companies.py
index d9547aa3..25f234af 100644
--- a/app/api/v1/admin/companies.py
+++ b/app/api/v1/admin/companies.py
@@ -75,7 +75,7 @@ def create_company_with_owner(
owner_username=owner_user.username,
owner_email=owner_user.email,
temporary_password=temp_password or "N/A (Existing user)",
- login_url=f"http://localhost:8000/admin/login",
+ login_url="http://localhost:8000/admin/login",
)
diff --git a/app/api/v1/admin/dashboard.py b/app/api/v1/admin/dashboard.py
index 35fd9f03..ffcdf9d5 100644
--- a/app/api/v1/admin/dashboard.py
+++ b/app/api/v1/admin/dashboard.py
@@ -46,9 +46,13 @@ def get_admin_dashboard(
users=UserStatsResponse(**user_stats),
vendors=VendorStatsResponse(
total=vendor_stats.get("total", vendor_stats.get("total_vendors", 0)),
- verified=vendor_stats.get("verified", vendor_stats.get("verified_vendors", 0)),
+ verified=vendor_stats.get(
+ "verified", vendor_stats.get("verified_vendors", 0)
+ ),
pending=vendor_stats.get("pending", vendor_stats.get("pending_vendors", 0)),
- inactive=vendor_stats.get("inactive", vendor_stats.get("inactive_vendors", 0)),
+ inactive=vendor_stats.get(
+ "inactive", vendor_stats.get("inactive_vendors", 0)
+ ),
),
recent_vendors=admin_service.get_recent_vendors(db, limit=5),
recent_imports=admin_service.get_recent_import_jobs(db, limit=10),
@@ -109,9 +113,13 @@ def get_platform_statistics(
users=UserStatsResponse(**user_stats),
vendors=VendorStatsResponse(
total=vendor_stats.get("total", vendor_stats.get("total_vendors", 0)),
- verified=vendor_stats.get("verified", vendor_stats.get("verified_vendors", 0)),
+ verified=vendor_stats.get(
+ "verified", vendor_stats.get("verified_vendors", 0)
+ ),
pending=vendor_stats.get("pending", vendor_stats.get("pending_vendors", 0)),
- inactive=vendor_stats.get("inactive", vendor_stats.get("inactive_vendors", 0)),
+ inactive=vendor_stats.get(
+ "inactive", vendor_stats.get("inactive_vendors", 0)
+ ),
),
products=ProductStatsResponse(**product_stats),
orders=OrderStatsBasicResponse(**order_stats),
diff --git a/app/api/v1/admin/letzshop.py b/app/api/v1/admin/letzshop.py
index 431e6e2b..30439953 100644
--- a/app/api/v1/admin/letzshop.py
+++ b/app/api/v1/admin/letzshop.py
@@ -68,7 +68,9 @@ def get_credentials_service(db: Session) -> LetzshopCredentialsService:
def list_vendors_letzshop_status(
skip: int = Query(0, ge=0),
limit: int = Query(100, ge=1, le=1000),
- configured_only: bool = Query(False, description="Only show vendors with Letzshop configured"),
+ configured_only: bool = Query(
+ False, description="Only show vendors with Letzshop configured"
+ ),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
):
@@ -119,8 +121,9 @@ def get_vendor_credentials(
credentials = creds_service.get_credentials_or_raise(vendor_id)
except CredentialsNotFoundError:
raise ResourceNotFoundException(
- "LetzshopCredentials", str(vendor_id),
- message=f"Letzshop credentials not configured for vendor {vendor.name}"
+ "LetzshopCredentials",
+ str(vendor_id),
+ message=f"Letzshop credentials not configured for vendor {vendor.name}",
)
return LetzshopCredentialsResponse(
@@ -215,8 +218,9 @@ def update_vendor_credentials(
db.commit()
except CredentialsNotFoundError:
raise ResourceNotFoundException(
- "LetzshopCredentials", str(vendor_id),
- message=f"Letzshop credentials not configured for vendor {vendor.name}"
+ "LetzshopCredentials",
+ str(vendor_id),
+ message=f"Letzshop credentials not configured for vendor {vendor.name}",
)
return LetzshopCredentialsResponse(
@@ -255,8 +259,9 @@ def delete_vendor_credentials(
deleted = creds_service.delete_credentials(vendor_id)
if not deleted:
raise ResourceNotFoundException(
- "LetzshopCredentials", str(vendor_id),
- message=f"Letzshop credentials not configured for vendor {vendor.name}"
+ "LetzshopCredentials",
+ str(vendor_id),
+ message=f"Letzshop credentials not configured for vendor {vendor.name}",
)
db.commit()
@@ -445,7 +450,9 @@ def trigger_vendor_sync(
orders_imported += 1
except Exception as e:
- errors.append(f"Error processing shipment {shipment.get('id')}: {e}")
+ errors.append(
+ f"Error processing shipment {shipment.get('id')}: {e}"
+ )
db.commit()
diff --git a/app/api/v1/admin/logs.py b/app/api/v1/admin/logs.py
index 7e1aaadc..fd8ae8bb 100644
--- a/app/api/v1/admin/logs.py
+++ b/app/api/v1/admin/logs.py
@@ -12,7 +12,7 @@ Provides endpoints for:
import logging
-from fastapi import APIRouter, Depends, Query, Response
+from fastapi import APIRouter, Depends, Query
from sqlalchemy.orm import Session
from app.api.deps import get_current_admin_api
@@ -190,9 +190,10 @@ def download_log_file(
"""
from pathlib import Path
- from app.core.config import settings
from fastapi.responses import FileResponse
+ from app.core.config import settings
+
# Determine log file path
log_file_path = settings.log_file
if log_file_path:
diff --git a/app/api/v1/admin/products.py b/app/api/v1/admin/products.py
index 80800956..0d64d4e1 100644
--- a/app/api/v1/admin/products.py
+++ b/app/api/v1/admin/products.py
@@ -149,7 +149,9 @@ class AdminProductDetail(BaseModel):
def get_products(
skip: int = Query(0, ge=0),
limit: int = Query(50, ge=1, le=500),
- search: str | None = Query(None, description="Search by title, GTIN, SKU, or brand"),
+ search: str | None = Query(
+ None, description="Search by title, GTIN, SKU, or brand"
+ ),
marketplace: str | None = Query(None, description="Filter by marketplace"),
vendor_name: str | None = Query(None, description="Filter by vendor name"),
availability: str | None = Query(None, description="Filter by availability"),
diff --git a/app/api/v1/admin/settings.py b/app/api/v1/admin/settings.py
index c703abe2..0ec2ae52 100644
--- a/app/api/v1/admin/settings.py
+++ b/app/api/v1/admin/settings.py
@@ -79,9 +79,7 @@ def get_setting(
setting = admin_settings_service.get_setting_by_key(db, key)
if not setting:
- raise ResourceNotFoundException(
- resource_type="Setting", identifier=key
- )
+ raise ResourceNotFoundException(resource_type="Setting", identifier=key)
return AdminSettingResponse.model_validate(setting)
diff --git a/app/api/v1/admin/tests.py b/app/api/v1/admin/tests.py
index 914e081c..e7af97c8 100644
--- a/app/api/v1/admin/tests.py
+++ b/app/api/v1/admin/tests.py
@@ -65,7 +65,9 @@ class RunTestsRequest(BaseModel):
"""Request model for running tests"""
test_path: str = Field("tests", description="Path to tests to run")
- extra_args: list[str] | None = Field(None, description="Additional pytest arguments")
+ extra_args: list[str] | None = Field(
+ None, description="Additional pytest arguments"
+ )
class TestDashboardStatsResponse(BaseModel):
@@ -205,6 +207,7 @@ async def get_run(
if not run:
from app.exceptions.base import ResourceNotFoundException
+
raise ResourceNotFoundException("TestRun", str(run_id))
return TestRunResponse(
@@ -231,7 +234,9 @@ async def get_run(
@router.get("/runs/{run_id}/results", response_model=list[TestResultResponse])
async def get_run_results(
run_id: int,
- outcome: str | None = Query(None, description="Filter by outcome (passed, failed, error, skipped)"),
+ outcome: str | None = Query(
+ None, description="Filter by outcome (passed, failed, error, skipped)"
+ ),
db: Session = Depends(get_db),
current_user: User = Depends(get_current_admin_api),
):
diff --git a/app/api/v1/admin/users.py b/app/api/v1/admin/users.py
index 730249b8..72df3d72 100644
--- a/app/api/v1/admin/users.py
+++ b/app/api/v1/admin/users.py
@@ -99,7 +99,9 @@ def create_user(
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies) if user.owned_companies else 0,
- vendor_memberships_count=len(user.vendor_memberships) if user.vendor_memberships else 0,
+ vendor_memberships_count=len(user.vendor_memberships)
+ if user.vendor_memberships
+ else 0,
)
@@ -151,7 +153,9 @@ def get_user_details(
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies) if user.owned_companies else 0,
- vendor_memberships_count=len(user.vendor_memberships) if user.vendor_memberships else 0,
+ vendor_memberships_count=len(user.vendor_memberships)
+ if user.vendor_memberships
+ else 0,
)
@@ -192,7 +196,9 @@ def update_user(
full_name=user.full_name,
is_email_verified=user.is_email_verified,
owned_companies_count=len(user.owned_companies) if user.owned_companies else 0,
- vendor_memberships_count=len(user.vendor_memberships) if user.vendor_memberships else 0,
+ vendor_memberships_count=len(user.vendor_memberships)
+ if user.vendor_memberships
+ else 0,
)
diff --git a/app/api/v1/admin/vendor_themes.py b/app/api/v1/admin/vendor_themes.py
index f4767cdb..38fcdf46 100644
--- a/app/api/v1/admin/vendor_themes.py
+++ b/app/api/v1/admin/vendor_themes.py
@@ -229,4 +229,6 @@ async def delete_vendor_theme(
# Global exception handler converts them to proper HTTP responses
result = vendor_theme_service.delete_theme(db, vendor_code)
db.commit()
- return ThemeDeleteResponse(message=result.get("message", "Theme deleted successfully"))
+ return ThemeDeleteResponse(
+ message=result.get("message", "Theme deleted successfully")
+ )
diff --git a/app/api/v1/admin/vendors.py b/app/api/v1/admin/vendors.py
index 01a450e8..62dc92a0 100644
--- a/app/api/v1/admin/vendors.py
+++ b/app/api/v1/admin/vendors.py
@@ -318,7 +318,9 @@ def delete_vendor(
@router.get("/{vendor_identifier}/export/letzshop")
def export_vendor_products_letzshop(
vendor_identifier: str = Path(..., description="Vendor ID or vendor_code"),
- language: str = Query("en", description="Language for title/description (en, fr, de)"),
+ language: str = Query(
+ "en", description="Language for title/description (en, fr, de)"
+ ),
include_inactive: bool = Query(False, description="Include inactive products"),
db: Session = Depends(get_db),
current_admin: User = Depends(get_current_admin_api),
diff --git a/app/api/v1/shared/language.py b/app/api/v1/shared/language.py
index 57e565b2..4fc23d39 100644
--- a/app/api/v1/shared/language.py
+++ b/app/api/v1/shared/language.py
@@ -7,6 +7,7 @@ These endpoints handle:
- Getting current language info
- Listing available languages
"""
+
import logging
from fastapi import APIRouter, Request, Response
diff --git a/app/api/v1/vendor/analytics.py b/app/api/v1/vendor/analytics.py
index c0fa0f8f..99e42602 100644
--- a/app/api/v1/vendor/analytics.py
+++ b/app/api/v1/vendor/analytics.py
@@ -39,7 +39,9 @@ def get_vendor_analytics(
period=data["period"],
start_date=data["start_date"],
imports=VendorAnalyticsImports(count=data["imports"]["count"]),
- catalog=VendorAnalyticsCatalog(products_added=data["catalog"]["products_added"]),
+ catalog=VendorAnalyticsCatalog(
+ products_added=data["catalog"]["products_added"]
+ ),
inventory=VendorAnalyticsInventory(
total_locations=data["inventory"]["total_locations"]
),
diff --git a/app/api/v1/vendor/auth.py b/app/api/v1/vendor/auth.py
index 959a5f41..30527772 100644
--- a/app/api/v1/vendor/auth.py
+++ b/app/api/v1/vendor/auth.py
@@ -25,7 +25,6 @@ from app.exceptions import InvalidCredentialsException
from app.services.auth_service import auth_service
from middleware.vendor_context import get_current_vendor
from models.database.user import User
-from models.database.vendor import Vendor
from models.schema.auth import LogoutResponse, UserLogin, VendorUserResponse
router = APIRouter(prefix="/auth")
@@ -95,9 +94,7 @@ def vendor_login(
f"User {user.username} attempted login to vendor {vendor.vendor_code} "
f"but is not authorized"
)
- raise InvalidCredentialsException(
- "You do not have access to this vendor"
- )
+ raise InvalidCredentialsException("You do not have access to this vendor")
else:
# No vendor context - find which vendor this user belongs to
vendor, vendor_role = auth_service.find_user_vendor(user)
diff --git a/app/api/v1/vendor/customers.py b/app/api/v1/vendor/customers.py
index 4de51dfd..6a88ee66 100644
--- a/app/api/v1/vendor/customers.py
+++ b/app/api/v1/vendor/customers.py
@@ -90,7 +90,9 @@ def get_customer_orders(
- Return order details
"""
vendor = vendor_service.get_vendor_by_id(db, current_user.token_vendor_id) # noqa: F841
- return CustomerOrdersResponse(orders=[], message="Customer orders coming in Slice 5")
+ return CustomerOrdersResponse(
+ orders=[], message="Customer orders coming in Slice 5"
+ )
@router.put("/{customer_id}", response_model=CustomerMessageResponse)
diff --git a/app/api/v1/vendor/inventory.py b/app/api/v1/vendor/inventory.py
index ba443fff..42db57fc 100644
--- a/app/api/v1/vendor/inventory.py
+++ b/app/api/v1/vendor/inventory.py
@@ -5,6 +5,7 @@ Vendor inventory management endpoints.
Vendor Context: Uses token_vendor_id from JWT token (authenticated vendor API pattern).
The get_current_vendor_api dependency guarantees token_vendor_id is present.
"""
+
import logging
from fastapi import APIRouter, Depends, Query
@@ -36,7 +37,9 @@ def set_inventory(
db: Session = Depends(get_db),
):
"""Set exact inventory quantity (replaces existing)."""
- result = inventory_service.set_inventory(db, current_user.token_vendor_id, inventory)
+ result = inventory_service.set_inventory(
+ db, current_user.token_vendor_id, inventory
+ )
db.commit()
return result
@@ -48,7 +51,9 @@ def adjust_inventory(
db: Session = Depends(get_db),
):
"""Adjust inventory (positive to add, negative to remove)."""
- result = inventory_service.adjust_inventory(db, current_user.token_vendor_id, adjustment)
+ result = inventory_service.adjust_inventory(
+ db, current_user.token_vendor_id, adjustment
+ )
db.commit()
return result
@@ -60,7 +65,9 @@ def reserve_inventory(
db: Session = Depends(get_db),
):
"""Reserve inventory for an order."""
- result = inventory_service.reserve_inventory(db, current_user.token_vendor_id, reservation)
+ result = inventory_service.reserve_inventory(
+ db, current_user.token_vendor_id, reservation
+ )
db.commit()
return result
@@ -72,7 +79,9 @@ def release_reservation(
db: Session = Depends(get_db),
):
"""Release reserved inventory (cancel order)."""
- result = inventory_service.release_reservation(db, current_user.token_vendor_id, reservation)
+ result = inventory_service.release_reservation(
+ db, current_user.token_vendor_id, reservation
+ )
db.commit()
return result
@@ -84,7 +93,9 @@ def fulfill_reservation(
db: Session = Depends(get_db),
):
"""Fulfill reservation (complete order, remove from stock)."""
- result = inventory_service.fulfill_reservation(db, current_user.token_vendor_id, reservation)
+ result = inventory_service.fulfill_reservation(
+ db, current_user.token_vendor_id, reservation
+ )
db.commit()
return result
@@ -96,7 +107,9 @@ def get_product_inventory(
db: Session = Depends(get_db),
):
"""Get inventory summary for a product."""
- return inventory_service.get_product_inventory(db, current_user.token_vendor_id, product_id)
+ return inventory_service.get_product_inventory(
+ db, current_user.token_vendor_id, product_id
+ )
@router.get("/inventory", response_model=InventoryListResponse)
diff --git a/app/api/v1/vendor/letzshop.py b/app/api/v1/vendor/letzshop.py
index 373a245b..c4d10691 100644
--- a/app/api/v1/vendor/letzshop.py
+++ b/app/api/v1/vendor/letzshop.py
@@ -323,9 +323,7 @@ def get_order(
order_service = get_order_service(db)
try:
- order = order_service.get_order_or_raise(
- current_user.token_vendor_id, order_id
- )
+ order = order_service.get_order_or_raise(current_user.token_vendor_id, order_id)
except OrderNotFoundError:
raise ResourceNotFoundException("LetzshopOrder", str(order_id))
@@ -396,7 +394,9 @@ def import_orders(
orders_imported += 1
except Exception as e:
- errors.append(f"Error processing shipment {shipment.get('id')}: {e}")
+ errors.append(
+ f"Error processing shipment {shipment.get('id')}: {e}"
+ )
db.commit()
creds_service.update_sync_status(
@@ -475,9 +475,7 @@ def confirm_order(
return FulfillmentOperationResponse(
success=True,
message=f"Confirmed {len(inventory_unit_ids)} inventory units",
- confirmed_units=[
- u.get("id") for u in result.get("inventoryUnits", [])
- ],
+ confirmed_units=[u.get("id") for u in result.get("inventoryUnits", [])],
)
except LetzshopClientError as e:
@@ -699,7 +697,9 @@ def list_fulfillment_queue(
@router.get("/export")
def export_products_letzshop(
- language: str = Query("en", description="Language for title/description (en, fr, de)"),
+ language: str = Query(
+ "en", description="Language for title/description (en, fr, de)"
+ ),
include_inactive: bool = Query(False, description="Include inactive products"),
current_user: User = Depends(get_current_vendor_api),
db: Session = Depends(get_db),
diff --git a/app/api/v1/vendor/products.py b/app/api/v1/vendor/products.py
index 5a540dd0..af161380 100644
--- a/app/api/v1/vendor/products.py
+++ b/app/api/v1/vendor/products.py
@@ -181,18 +181,20 @@ def toggle_product_active(
db: Session = Depends(get_db),
):
"""Toggle product active status."""
- product = product_service.get_product(
- db, current_user.token_vendor_id, product_id
- )
+ product = product_service.get_product(db, current_user.token_vendor_id, product_id)
product.is_active = not product.is_active
db.commit()
db.refresh(product)
status = "activated" if product.is_active else "deactivated"
- logger.info(f"Product {product_id} {status} for vendor {current_user.token_vendor_code}")
+ logger.info(
+ f"Product {product_id} {status} for vendor {current_user.token_vendor_code}"
+ )
- return ProductToggleResponse(message=f"Product {status}", is_active=product.is_active)
+ return ProductToggleResponse(
+ message=f"Product {status}", is_active=product.is_active
+ )
@router.put("/{product_id}/toggle-featured", response_model=ProductToggleResponse)
@@ -202,15 +204,17 @@ def toggle_product_featured(
db: Session = Depends(get_db),
):
"""Toggle product featured status."""
- product = product_service.get_product(
- db, current_user.token_vendor_id, product_id
- )
+ product = product_service.get_product(db, current_user.token_vendor_id, product_id)
product.is_featured = not product.is_featured
db.commit()
db.refresh(product)
status = "featured" if product.is_featured else "unfeatured"
- logger.info(f"Product {product_id} {status} for vendor {current_user.token_vendor_code}")
+ logger.info(
+ f"Product {product_id} {status} for vendor {current_user.token_vendor_code}"
+ )
- return ProductToggleResponse(message=f"Product {status}", is_featured=product.is_featured)
+ return ProductToggleResponse(
+ message=f"Product {status}", is_featured=product.is_featured
+ )
diff --git a/app/core/logging.py b/app/core/logging.py
index 18757755..3066f84e 100644
--- a/app/core/logging.py
+++ b/app/core/logging.py
@@ -54,9 +54,15 @@ class DatabaseLogHandler(logging.Handler):
stack_trace = None
if record.exc_info:
- exception_type = record.exc_info[0].__name__ if record.exc_info[0] else None
- exception_message = str(record.exc_info[1]) if record.exc_info[1] else None
- stack_trace = "".join(traceback.format_exception(*record.exc_info))
+ exception_type = (
+ record.exc_info[0].__name__ if record.exc_info[0] else None
+ )
+ exception_message = (
+ str(record.exc_info[1]) if record.exc_info[1] else None
+ )
+ stack_trace = "".join(
+ traceback.format_exception(*record.exc_info)
+ )
# Extract context from record (if middleware added it)
user_id = getattr(record, "user_id", None)
@@ -95,7 +101,6 @@ class DatabaseLogHandler(logging.Handler):
continue
# For other errors or final attempt, silently skip
# Don't print to stderr to avoid log spam during imports
- pass
finally:
db.close()
@@ -206,9 +211,7 @@ def setup_logging():
detailed_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - [%(module)s:%(funcName)s:%(lineno)d] - %(message)s"
)
- simple_formatter = logging.Formatter(
- "%(asctime)s - %(levelname)s - %(message)s"
- )
+ simple_formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
# Console handler (simple format)
console_handler = logging.StreamHandler(sys.stdout)
@@ -217,10 +220,7 @@ def setup_logging():
# Rotating file handler (detailed format)
file_handler = RotatingFileHandler(
- log_file,
- maxBytes=max_bytes,
- backupCount=backup_count,
- encoding="utf-8"
+ log_file, maxBytes=max_bytes, backupCount=backup_count, encoding="utf-8"
)
file_handler.setFormatter(detailed_formatter)
logger.addHandler(file_handler)
@@ -232,7 +232,10 @@ def setup_logging():
logger.addHandler(db_handler)
except Exception as e:
# If database handler fails, just use file logging
- print(f"Warning: Database logging handler could not be initialized: {e}", file=sys.stderr)
+ print(
+ f"Warning: Database logging handler could not be initialized: {e}",
+ file=sys.stderr,
+ )
# Configure specific loggers to reduce noise
logging.getLogger("uvicorn.access").setLevel(logging.WARNING)
@@ -245,7 +248,7 @@ def setup_logging():
logger.info(f"Log File: {log_file}")
logger.info(f"Max File Size: {max_bytes / (1024 * 1024):.1f} MB")
logger.info(f"Backup Count: {backup_count}")
- logger.info(f"Database Logging: Enabled (WARNING and above)")
+ logger.info("Database Logging: Enabled (WARNING and above)")
logger.info("=" * 80)
return logging.getLogger(__name__)
diff --git a/app/exceptions/__init__.py b/app/exceptions/__init__.py
index 7280116b..11cebb21 100644
--- a/app/exceptions/__init__.py
+++ b/app/exceptions/__init__.py
@@ -46,6 +46,16 @@ from .base import (
WizamartException,
)
+# Cart exceptions
+from .cart import (
+ CartItemNotFoundException,
+ CartValidationException,
+ EmptyCartException,
+ InsufficientInventoryForCartException,
+ InvalidCartQuantityException,
+ ProductNotAvailableForCartException,
+)
+
# Code quality exceptions
from .code_quality import (
InvalidViolationStatusException,
@@ -57,16 +67,6 @@ from .code_quality import (
ViolationOperationException,
)
-# Cart exceptions
-from .cart import (
- CartItemNotFoundException,
- CartValidationException,
- EmptyCartException,
- InsufficientInventoryForCartException,
- InvalidCartQuantityException,
- ProductNotAvailableForCartException,
-)
-
# Company exceptions
from .company import (
CompanyAlreadyExistsException,
diff --git a/app/exceptions/handler.py b/app/exceptions/handler.py
index e0203b02..4b659717 100644
--- a/app/exceptions/handler.py
+++ b/app/exceptions/handler.py
@@ -366,11 +366,11 @@ def _redirect_to_login(request: Request) -> RedirectResponse:
if context_type == RequestContext.VENDOR_DASHBOARD:
# Extract vendor code from the request path
# Path format: /vendor/{vendor_code}/...
- path_parts = request.url.path.split('/')
+ path_parts = request.url.path.split("/")
vendor_code = None
# Find vendor code in path
- if len(path_parts) >= 3 and path_parts[1] == 'vendor':
+ if len(path_parts) >= 3 and path_parts[1] == "vendor":
vendor_code = path_parts[2]
# Fallback: try to get from request state
diff --git a/app/routes/admin_pages.py b/app/routes/admin_pages.py
index 04a12992..5fb49d4d 100644
--- a/app/routes/admin_pages.py
+++ b/app/routes/admin_pages.py
@@ -408,9 +408,7 @@ async def admin_user_create_page(
)
-@router.get(
- "/users/{user_id}", response_class=HTMLResponse, include_in_schema=False
-)
+@router.get("/users/{user_id}", response_class=HTMLResponse, include_in_schema=False)
async def admin_user_detail_page(
request: Request,
user_id: int = Path(..., description="User ID"),
@@ -562,7 +560,9 @@ async def admin_letzshop_page(
# ============================================================================
-@router.get("/marketplace-products", response_class=HTMLResponse, include_in_schema=False)
+@router.get(
+ "/marketplace-products", response_class=HTMLResponse, include_in_schema=False
+)
async def admin_marketplace_products_page(
request: Request,
current_user: User = Depends(get_current_admin_from_cookie_or_header),
diff --git a/app/services/admin_service.py b/app/services/admin_service.py
index 68c31379..8a44cf9b 100644
--- a/app/services/admin_service.py
+++ b/app/services/admin_service.py
@@ -35,7 +35,7 @@ from middleware.auth import AuthManager
from models.database.company import Company
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.user import User
-from models.database.vendor import Role, Vendor, VendorUser
+from models.database.vendor import Role, Vendor
from models.schema.marketplace_import_job import MarketplaceImportJobResponse
from models.schema.vendor import VendorCreate
@@ -143,7 +143,9 @@ class AdminService:
# Apply pagination
skip = (page - 1) * per_page
- users = query.order_by(User.created_at.desc()).offset(skip).limit(per_page).all()
+ users = (
+ query.order_by(User.created_at.desc()).offset(skip).limit(per_page).all()
+ )
return users, total, pages
@@ -199,7 +201,9 @@ class AdminService:
"""
user = (
db.query(User)
- .options(joinedload(User.owned_companies), joinedload(User.vendor_memberships))
+ .options(
+ joinedload(User.owned_companies), joinedload(User.vendor_memberships)
+ )
.filter(User.id == user_id)
.first()
)
@@ -243,12 +247,16 @@ class AdminService:
# Check email uniqueness if changing
if email and email != user.email:
if db.query(User).filter(User.email == email).first():
- raise UserAlreadyExistsException("Email already registered", field="email")
+ raise UserAlreadyExistsException(
+ "Email already registered", field="email"
+ )
# Check username uniqueness if changing
if username and username != user.username:
if db.query(User).filter(User.username == username).first():
- raise UserAlreadyExistsException("Username already taken", field="username")
+ raise UserAlreadyExistsException(
+ "Username already taken", field="username"
+ )
# Update fields
if email is not None:
@@ -322,7 +330,9 @@ class AdminService:
search_term = f"%{query.lower()}%"
users = (
db.query(User)
- .filter(or_(User.username.ilike(search_term), User.email.ilike(search_term)))
+ .filter(
+ or_(User.username.ilike(search_term), User.email.ilike(search_term))
+ )
.limit(limit)
.all()
)
@@ -360,14 +370,20 @@ class AdminService:
"""
try:
# Validate company exists
- company = db.query(Company).filter(Company.id == vendor_data.company_id).first()
+ company = (
+ db.query(Company).filter(Company.id == vendor_data.company_id).first()
+ )
if not company:
- raise ValidationException(f"Company with ID {vendor_data.company_id} not found")
+ raise ValidationException(
+ f"Company with ID {vendor_data.company_id} not found"
+ )
# Check if vendor code already exists
existing_vendor = (
db.query(Vendor)
- .filter(func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper())
+ .filter(
+ func.upper(Vendor.vendor_code) == vendor_data.vendor_code.upper()
+ )
.first()
)
if existing_vendor:
@@ -613,7 +629,13 @@ class AdminService:
update_data["tax_number"] = None
# Convert empty strings to None for contact fields (empty = inherit)
- contact_fields = ["contact_email", "contact_phone", "website", "business_address", "tax_number"]
+ contact_fields = [
+ "contact_email",
+ "contact_phone",
+ "website",
+ "business_address",
+ "tax_number",
+ ]
for field in contact_fields:
if field in update_data and update_data[field] == "":
update_data[field] = None
diff --git a/app/services/background_tasks_service.py b/app/services/background_tasks_service.py
index c4f77f88..68c5b409 100644
--- a/app/services/background_tasks_service.py
+++ b/app/services/background_tasks_service.py
@@ -48,7 +48,9 @@ class BackgroundTasksService:
def get_import_stats(self, db: Session) -> dict:
"""Get import job statistics"""
- today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
+ today_start = datetime.now(UTC).replace(
+ hour=0, minute=0, second=0, microsecond=0
+ )
stats = db.query(
func.count(MarketplaceImportJob.id).label("total"),
@@ -57,7 +59,12 @@ class BackgroundTasksService:
).label("running"),
func.sum(
func.case(
- (MarketplaceImportJob.status.in_(["completed", "completed_with_errors"]), 1),
+ (
+ MarketplaceImportJob.status.in_(
+ ["completed", "completed_with_errors"]
+ ),
+ 1,
+ ),
else_=0,
)
).label("completed"),
@@ -83,12 +90,18 @@ class BackgroundTasksService:
def get_test_run_stats(self, db: Session) -> dict:
"""Get test run statistics"""
- today_start = datetime.now(UTC).replace(hour=0, minute=0, second=0, microsecond=0)
+ today_start = datetime.now(UTC).replace(
+ hour=0, minute=0, second=0, microsecond=0
+ )
stats = db.query(
func.count(TestRun.id).label("total"),
- func.sum(func.case((TestRun.status == "running", 1), else_=0)).label("running"),
- func.sum(func.case((TestRun.status == "passed", 1), else_=0)).label("completed"),
+ func.sum(func.case((TestRun.status == "running", 1), else_=0)).label(
+ "running"
+ ),
+ func.sum(func.case((TestRun.status == "passed", 1), else_=0)).label(
+ "completed"
+ ),
func.sum(
func.case((TestRun.status.in_(["failed", "error"]), 1), else_=0)
).label("failed"),
diff --git a/app/services/company_service.py b/app/services/company_service.py
index 4693ae6e..054653b9 100644
--- a/app/services/company_service.py
+++ b/app/services/company_service.py
@@ -8,7 +8,6 @@ This service handles CRUD operations for companies and company-vendor relationsh
import logging
import secrets
import string
-from typing import List, Optional
from sqlalchemy import func, select
from sqlalchemy.orm import Session, joinedload
@@ -26,7 +25,6 @@ class CompanyService:
def __init__(self):
"""Initialize company service."""
- pass
def create_company_with_owner(
self, db: Session, company_data: CompanyCreate
@@ -106,11 +104,15 @@ class CompanyService:
Raises:
CompanyNotFoundException: If company not found
"""
- company = db.execute(
- select(Company)
- .where(Company.id == company_id)
- .options(joinedload(Company.vendors))
- ).unique().scalar_one_or_none()
+ company = (
+ db.execute(
+ select(Company)
+ .where(Company.id == company_id)
+ .options(joinedload(Company.vendors))
+ )
+ .unique()
+ .scalar_one_or_none()
+ )
if not company:
raise CompanyNotFoundException(company_id)
@@ -125,7 +127,7 @@ class CompanyService:
search: str | None = None,
is_active: bool | None = None,
is_verified: bool | None = None,
- ) -> tuple[List[Company], int]:
+ ) -> tuple[list[Company], int]:
"""
Get paginated list of companies with optional filters.
@@ -209,7 +211,9 @@ class CompanyService:
db.flush()
logger.info(f"Deleted company ID {company_id} and associated vendors")
- def toggle_verification(self, db: Session, company_id: int, is_verified: bool) -> Company:
+ def toggle_verification(
+ self, db: Session, company_id: int, is_verified: bool
+ ) -> Company:
"""
Toggle company verification status.
@@ -227,9 +231,7 @@ class CompanyService:
company = self.get_company_by_id(db, company_id)
company.is_verified = is_verified
db.flush()
- logger.info(
- f"Company ID {company_id} verification set to {is_verified}"
- )
+ logger.info(f"Company ID {company_id} verification set to {is_verified}")
return company
@@ -251,9 +253,7 @@ class CompanyService:
company = self.get_company_by_id(db, company_id)
company.is_active = is_active
db.flush()
- logger.info(
- f"Company ID {company_id} active status set to {is_active}"
- )
+ logger.info(f"Company ID {company_id} active status set to {is_active}")
return company
diff --git a/app/services/content_page_service.py b/app/services/content_page_service.py
index 0e1041d5..cbacc6b5 100644
--- a/app/services/content_page_service.py
+++ b/app/services/content_page_service.py
@@ -526,7 +526,9 @@ class ContentPageService:
return (
db.query(ContentPage)
.filter(and_(*filters) if filters else True)
- .order_by(ContentPage.vendor_id, ContentPage.display_order, ContentPage.title)
+ .order_by(
+ ContentPage.vendor_id, ContentPage.display_order, ContentPage.title
+ )
.all()
)
diff --git a/app/services/letzshop/client_service.py b/app/services/letzshop/client_service.py
index 93963304..d713bc86 100644
--- a/app/services/letzshop/client_service.py
+++ b/app/services/letzshop/client_service.py
@@ -30,20 +30,14 @@ class LetzshopClientError(Exception):
class LetzshopAuthError(LetzshopClientError):
"""Raised when authentication fails."""
- pass
-
class LetzshopAPIError(LetzshopClientError):
"""Raised when the API returns an error response."""
- pass
-
class LetzshopConnectionError(LetzshopClientError):
"""Raised when connection to the API fails."""
- pass
-
# ============================================================================
# GraphQL Queries
diff --git a/app/services/letzshop/credentials_service.py b/app/services/letzshop/credentials_service.py
index d42a895a..1528cb07 100644
--- a/app/services/letzshop/credentials_service.py
+++ b/app/services/letzshop/credentials_service.py
@@ -6,7 +6,7 @@ Handles secure storage and retrieval of per-vendor Letzshop API credentials.
"""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from sqlalchemy.orm import Session
@@ -24,14 +24,10 @@ DEFAULT_ENDPOINT = "https://letzshop.lu/graphql"
class CredentialsError(Exception):
"""Base exception for credentials errors."""
- pass
-
class CredentialsNotFoundError(CredentialsError):
"""Raised when credentials are not found for a vendor."""
- pass
-
class LetzshopCredentialsService:
"""
@@ -54,9 +50,7 @@ class LetzshopCredentialsService:
# CRUD Operations
# ========================================================================
- def get_credentials(
- self, vendor_id: int
- ) -> VendorLetzshopCredentials | None:
+ def get_credentials(self, vendor_id: int) -> VendorLetzshopCredentials | None:
"""
Get Letzshop credentials for a vendor.
@@ -72,9 +66,7 @@ class LetzshopCredentialsService:
.first()
)
- def get_credentials_or_raise(
- self, vendor_id: int
- ) -> VendorLetzshopCredentials:
+ def get_credentials_or_raise(self, vendor_id: int) -> VendorLetzshopCredentials:
"""
Get Letzshop credentials for a vendor or raise an exception.
@@ -293,9 +285,7 @@ class LetzshopCredentialsService:
# Connection Testing
# ========================================================================
- def test_connection(
- self, vendor_id: int
- ) -> tuple[bool, float | None, str | None]:
+ def test_connection(self, vendor_id: int) -> tuple[bool, float | None, str | None]:
"""
Test the connection for a vendor's credentials.
@@ -364,7 +354,7 @@ class LetzshopCredentialsService:
if credentials is None:
return None
- credentials.last_sync_at = datetime.now(timezone.utc)
+ credentials.last_sync_at = datetime.now(UTC)
credentials.last_sync_status = status
credentials.last_sync_error = error
diff --git a/app/services/letzshop/order_service.py b/app/services/letzshop/order_service.py
index 467396b2..09018dfc 100644
--- a/app/services/letzshop/order_service.py
+++ b/app/services/letzshop/order_service.py
@@ -7,7 +7,7 @@ architecture rules (API-002: endpoints should not contain business logic).
"""
import logging
-from datetime import datetime, timezone
+from datetime import UTC, datetime
from typing import Any
from sqlalchemy import func
@@ -21,21 +21,16 @@ from models.database.letzshop import (
)
from models.database.vendor import Vendor
-
logger = logging.getLogger(__name__)
class VendorNotFoundError(Exception):
"""Raised when a vendor is not found."""
- pass
-
class OrderNotFoundError(Exception):
"""Raised when a Letzshop order is not found."""
- pass
-
class LetzshopOrderService:
"""Service for Letzshop order database operations."""
@@ -114,17 +109,23 @@ class LetzshopOrderService:
or 0
)
- vendor_overviews.append({
- "vendor_id": vendor.id,
- "vendor_name": vendor.name,
- "vendor_code": vendor.vendor_code,
- "is_configured": credentials is not None,
- "auto_sync_enabled": credentials.auto_sync_enabled if credentials else False,
- "last_sync_at": credentials.last_sync_at if credentials else None,
- "last_sync_status": credentials.last_sync_status if credentials else None,
- "pending_orders": pending_orders,
- "total_orders": total_orders,
- })
+ vendor_overviews.append(
+ {
+ "vendor_id": vendor.id,
+ "vendor_name": vendor.name,
+ "vendor_code": vendor.vendor_code,
+ "is_configured": credentials is not None,
+ "auto_sync_enabled": credentials.auto_sync_enabled
+ if credentials
+ else False,
+ "last_sync_at": credentials.last_sync_at if credentials else None,
+ "last_sync_status": credentials.last_sync_status
+ if credentials
+ else None,
+ "pending_orders": pending_orders,
+ "total_orders": total_orders,
+ }
+ )
return vendor_overviews, total
@@ -210,9 +211,7 @@ class LetzshopOrderService:
letzshop_order_number=order_data.get("number"),
letzshop_state=shipment_data.get("state"),
customer_email=order_data.get("email"),
- total_amount=str(
- order_data.get("totalPrice", {}).get("amount", "")
- ),
+ total_amount=str(order_data.get("totalPrice", {}).get("amount", "")),
currency=order_data.get("totalPrice", {}).get("currency", "EUR"),
raw_order_data=shipment_data,
inventory_units=[
@@ -236,13 +235,13 @@ class LetzshopOrderService:
def mark_order_confirmed(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as confirmed."""
- order.confirmed_at = datetime.now(timezone.utc)
+ order.confirmed_at = datetime.now(UTC)
order.sync_status = "confirmed"
return order
def mark_order_rejected(self, order: LetzshopOrder) -> LetzshopOrder:
"""Mark an order as rejected."""
- order.rejected_at = datetime.now(timezone.utc)
+ order.rejected_at = datetime.now(UTC)
order.sync_status = "rejected"
return order
@@ -255,7 +254,7 @@ class LetzshopOrderService:
"""Set tracking information for an order."""
order.tracking_number = tracking_number
order.tracking_carrier = tracking_carrier
- order.tracking_set_at = datetime.now(timezone.utc)
+ order.tracking_set_at = datetime.now(UTC)
order.sync_status = "shipped"
return order
diff --git a/app/services/letzshop_export_service.py b/app/services/letzshop_export_service.py
index 9723aaa4..513f2dd9 100644
--- a/app/services/letzshop_export_service.py
+++ b/app/services/letzshop_export_service.py
@@ -4,10 +4,10 @@ Service for exporting products to Letzshop CSV format.
Generates Google Shopping compatible CSV files for Letzshop marketplace.
"""
+
import csv
import io
import logging
-from typing import BinaryIO
from sqlalchemy.orm import Session, joinedload
@@ -140,7 +140,9 @@ class LetzshopExportService:
)
if marketplace:
- query = query.filter(MarketplaceProduct.marketplace.ilike(f"%{marketplace}%"))
+ query = query.filter(
+ MarketplaceProduct.marketplace.ilike(f"%{marketplace}%")
+ )
if limit:
query = query.limit(limit)
@@ -193,7 +195,9 @@ class LetzshopExportService:
def _product_to_row(self, product: Product, language: str) -> dict:
"""Convert a Product (with MarketplaceProduct) to a CSV row."""
mp = product.marketplace_product
- return self._marketplace_product_to_row(mp, language, vendor_sku=product.vendor_sku)
+ return self._marketplace_product_to_row(
+ mp, language, vendor_sku=product.vendor_sku
+ )
def _marketplace_product_to_row(
self,
diff --git a/app/services/log_service.py b/app/services/log_service.py
index c6a95f50..ec93704e 100644
--- a/app/services/log_service.py
+++ b/app/services/log_service.py
@@ -11,7 +11,6 @@ This module provides functions for:
"""
import logging
-import os
from datetime import UTC, datetime, timedelta
from pathlib import Path
@@ -58,7 +57,9 @@ class LogService:
conditions.append(ApplicationLog.level == filters.level.upper())
if filters.logger_name:
- conditions.append(ApplicationLog.logger_name.like(f"%{filters.logger_name}%"))
+ conditions.append(
+ ApplicationLog.logger_name.like(f"%{filters.logger_name}%")
+ )
if filters.module:
conditions.append(ApplicationLog.module.like(f"%{filters.module}%"))
@@ -215,7 +216,8 @@ class LogService:
except Exception as e:
logger.error(f"Failed to get log statistics: {e}")
raise AdminOperationException(
- operation="get_log_statistics", reason=f"Database query failed: {str(e)}"
+ operation="get_log_statistics",
+ reason=f"Database query failed: {str(e)}",
)
def get_file_logs(
@@ -252,7 +254,7 @@ class LogService:
stat = log_file.stat()
# Read last N lines efficiently
- with open(log_file, "r", encoding="utf-8", errors="replace") as f:
+ with open(log_file, encoding="utf-8", errors="replace") as f:
# For large files, seek to end and read backwards
all_lines = f.readlines()
log_lines = all_lines[-lines:] if len(all_lines) > lines else all_lines
@@ -349,16 +351,21 @@ class LogService:
db.rollback()
logger.error(f"Failed to cleanup old logs: {e}")
raise AdminOperationException(
- operation="cleanup_old_logs", reason=f"Delete operation failed: {str(e)}"
+ operation="cleanup_old_logs",
+ reason=f"Delete operation failed: {str(e)}",
)
def delete_log(self, db: Session, log_id: int) -> str:
"""Delete a specific log entry."""
try:
- log_entry = db.query(ApplicationLog).filter(ApplicationLog.id == log_id).first()
+ log_entry = (
+ db.query(ApplicationLog).filter(ApplicationLog.id == log_id).first()
+ )
if not log_entry:
- raise ResourceNotFoundException(resource_type="log", identifier=str(log_id))
+ raise ResourceNotFoundException(
+ resource_type="log", identifier=str(log_id)
+ )
db.delete(log_entry)
db.commit()
diff --git a/app/services/marketplace_import_job_service.py b/app/services/marketplace_import_job_service.py
index 491cef2f..a36973c2 100644
--- a/app/services/marketplace_import_job_service.py
+++ b/app/services/marketplace_import_job_service.py
@@ -8,7 +8,10 @@ from app.exceptions import (
ImportJobNotOwnedException,
ValidationException,
)
-from models.database.marketplace_import_job import MarketplaceImportError, MarketplaceImportJob
+from models.database.marketplace_import_job import (
+ MarketplaceImportError,
+ MarketplaceImportJob,
+)
from models.database.user import User
from models.database.vendor import Vendor
from models.schema.marketplace_import_job import (
@@ -136,7 +139,9 @@ class MarketplaceImportJobService:
except (ImportJobNotFoundException, UnauthorizedVendorAccessException):
raise
except Exception as e:
- logger.error(f"Error getting import job {job_id} for vendor {vendor_id}: {str(e)}")
+ logger.error(
+ f"Error getting import job {job_id} for vendor {vendor_id}: {str(e)}"
+ )
raise ValidationException("Failed to retrieve import job")
def get_import_jobs(
diff --git a/app/services/marketplace_product_service.py b/app/services/marketplace_product_service.py
index 5fe6724e..dad389e4 100644
--- a/app/services/marketplace_product_service.py
+++ b/app/services/marketplace_product_service.py
@@ -32,7 +32,9 @@ from app.exceptions import (
from app.utils.data_processing import GTINProcessor, PriceProcessor
from models.database.inventory import Inventory
from models.database.marketplace_product import MarketplaceProduct
-from models.database.marketplace_product_translation import MarketplaceProductTranslation
+from models.database.marketplace_product_translation import (
+ MarketplaceProductTranslation,
+)
from models.schema.inventory import InventoryLocationResponse, InventorySummaryResponse
from models.schema.marketplace_product import (
MarketplaceProductCreate,
@@ -602,7 +604,6 @@ class MarketplaceProductService:
return normalized
-
# =========================================================================
# Admin-specific methods for marketplace product management
# =========================================================================
@@ -632,22 +633,28 @@ class MarketplaceProductService:
if search:
search_term = f"%{search}%"
- query = query.outerjoin(MarketplaceProductTranslation).filter(
- or_(
- MarketplaceProductTranslation.title.ilike(search_term),
- MarketplaceProduct.gtin.ilike(search_term),
- MarketplaceProduct.sku.ilike(search_term),
- MarketplaceProduct.brand.ilike(search_term),
- MarketplaceProduct.mpn.ilike(search_term),
- MarketplaceProduct.marketplace_product_id.ilike(search_term),
+ query = (
+ query.outerjoin(MarketplaceProductTranslation)
+ .filter(
+ or_(
+ MarketplaceProductTranslation.title.ilike(search_term),
+ MarketplaceProduct.gtin.ilike(search_term),
+ MarketplaceProduct.sku.ilike(search_term),
+ MarketplaceProduct.brand.ilike(search_term),
+ MarketplaceProduct.mpn.ilike(search_term),
+ MarketplaceProduct.marketplace_product_id.ilike(search_term),
+ )
)
- ).distinct()
+ .distinct()
+ )
if marketplace:
query = query.filter(MarketplaceProduct.marketplace == marketplace)
if vendor_name:
- query = query.filter(MarketplaceProduct.vendor_name.ilike(f"%{vendor_name}%"))
+ query = query.filter(
+ MarketplaceProduct.vendor_name.ilike(f"%{vendor_name}%")
+ )
if availability:
query = query.filter(MarketplaceProduct.availability == availability)
@@ -787,8 +794,12 @@ class MarketplaceProductService:
"weight": product.weight,
"weight_unit": product.weight_unit,
"translations": translations,
- "created_at": product.created_at.isoformat() if product.created_at else None,
- "updated_at": product.updated_at.isoformat() if product.updated_at else None,
+ "created_at": product.created_at.isoformat()
+ if product.created_at
+ else None,
+ "updated_at": product.updated_at.isoformat()
+ if product.updated_at
+ else None,
}
def copy_to_vendor_catalog(
@@ -810,6 +821,7 @@ class MarketplaceProductService:
vendor = db.query(Vendor).filter(Vendor.id == vendor_id).first()
if not vendor:
from app.exceptions import VendorNotFoundException
+
raise VendorNotFoundException(str(vendor_id), identifier_type="id")
marketplace_products = (
@@ -839,11 +851,13 @@ class MarketplaceProductService:
if existing:
skipped += 1
- details.append({
- "id": mp.id,
- "status": "skipped",
- "reason": "Already exists in catalog",
- })
+ details.append(
+ {
+ "id": mp.id,
+ "status": "skipped",
+ "reason": "Already exists in catalog",
+ }
+ )
continue
product = Product(
@@ -876,7 +890,9 @@ class MarketplaceProductService:
"details": details if len(details) <= 100 else None,
}
- def _build_admin_product_item(self, product: MarketplaceProduct, title: str | None) -> dict:
+ def _build_admin_product_item(
+ self, product: MarketplaceProduct, title: str | None
+ ) -> dict:
"""Build a product list item dict for admin view."""
return {
"id": product.id,
@@ -894,8 +910,12 @@ class MarketplaceProductService:
"is_active": product.is_active,
"is_digital": product.is_digital,
"product_type_enum": product.product_type_enum,
- "created_at": product.created_at.isoformat() if product.created_at else None,
- "updated_at": product.updated_at.isoformat() if product.updated_at else None,
+ "created_at": product.created_at.isoformat()
+ if product.created_at
+ else None,
+ "updated_at": product.updated_at.isoformat()
+ if product.updated_at
+ else None,
}
diff --git a/app/services/stats_service.py b/app/services/stats_service.py
index e929e8fe..a5a66716 100644
--- a/app/services/stats_service.py
+++ b/app/services/stats_service.py
@@ -160,7 +160,9 @@ class StatsService:
# Inventory stats
"total_inventory_quantity": int(total_inventory),
"reserved_inventory_quantity": int(reserved_inventory),
- "available_inventory_quantity": int(total_inventory - reserved_inventory),
+ "available_inventory_quantity": int(
+ total_inventory - reserved_inventory
+ ),
"inventory_locations_count": inventory_locations,
}
diff --git a/app/services/test_runner_service.py b/app/services/test_runner_service.py
index 275db6a3..573299d2 100644
--- a/app/services/test_runner_service.py
+++ b/app/services/test_runner_service.py
@@ -77,11 +77,15 @@ class TestRunnerService:
"""Execute pytest and update the test run record"""
try:
# Build pytest command with JSON output
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
+ with tempfile.NamedTemporaryFile(
+ mode="w", suffix=".json", delete=False
+ ) as f:
json_report_path = f.name
pytest_args = [
- "python", "-m", "pytest",
+ "python",
+ "-m",
+ "pytest",
test_path,
"--json-report",
f"--json-report-file={json_report_path}",
@@ -109,7 +113,7 @@ class TestRunnerService:
# Parse JSON report
try:
- with open(json_report_path, 'r') as f:
+ with open(json_report_path) as f:
report = json.load(f)
self._process_json_report(db, test_run, report)
@@ -167,7 +171,7 @@ class TestRunnerService:
traceback = call_info["longrepr"]
# Extract error message from traceback
if isinstance(traceback, str):
- lines = traceback.strip().split('\n')
+ lines = traceback.strip().split("\n")
if lines:
error_message = lines[-1][:500] # Last line, limited length
@@ -232,8 +236,12 @@ class TestRunnerService:
test_run.xpassed = count
test_run.total_tests = (
- test_run.passed + test_run.failed + test_run.errors +
- test_run.skipped + test_run.xfailed + test_run.xpassed
+ test_run.passed
+ + test_run.failed
+ + test_run.errors
+ + test_run.skipped
+ + test_run.xfailed
+ + test_run.xpassed
)
def _get_git_commit(self) -> str | None:
@@ -266,12 +274,7 @@ class TestRunnerService:
def get_run_history(self, db: Session, limit: int = 20) -> list[TestRun]:
"""Get recent test run history"""
- return (
- db.query(TestRun)
- .order_by(desc(TestRun.timestamp))
- .limit(limit)
- .all()
- )
+ return db.query(TestRun).order_by(desc(TestRun.timestamp)).limit(limit).all()
def get_run_by_id(self, db: Session, run_id: int) -> TestRun | None:
"""Get a specific test run with results"""
@@ -282,8 +285,7 @@ class TestRunnerService:
return (
db.query(TestResult)
.filter(
- TestResult.run_id == run_id,
- TestResult.outcome.in_(["failed", "error"])
+ TestResult.run_id == run_id, TestResult.outcome.in_(["failed", "error"])
)
.all()
)
@@ -310,7 +312,9 @@ class TestRunnerService:
)
# Get test collection info (or calculate from latest run)
- collection = db.query(TestCollection).order_by(desc(TestCollection.collected_at)).first()
+ collection = (
+ db.query(TestCollection).order_by(desc(TestCollection.collected_at)).first()
+ )
# Get trend data (last 10 runs)
trend_runs = (
@@ -324,7 +328,9 @@ class TestRunnerService:
# Calculate stats by category from latest run
by_category = {}
if latest_run:
- results = db.query(TestResult).filter(TestResult.run_id == latest_run.id).all()
+ results = (
+ db.query(TestResult).filter(TestResult.run_id == latest_run.id).all()
+ )
for result in results:
# Categorize by test path
if "unit" in result.test_file:
@@ -351,7 +357,7 @@ class TestRunnerService:
db.query(
TestResult.test_name,
TestResult.test_file,
- func.count(TestResult.id).label("failure_count")
+ func.count(TestResult.id).label("failure_count"),
)
.filter(TestResult.outcome.in_(["failed", "error"]))
.group_by(TestResult.test_name, TestResult.test_file)
@@ -368,11 +374,12 @@ class TestRunnerService:
"errors": latest_run.errors if latest_run else 0,
"skipped": latest_run.skipped if latest_run else 0,
"pass_rate": round(latest_run.pass_rate, 1) if latest_run else 0,
- "duration_seconds": round(latest_run.duration_seconds, 2) if latest_run else 0,
+ "duration_seconds": round(latest_run.duration_seconds, 2)
+ if latest_run
+ else 0,
"coverage_percent": latest_run.coverage_percent if latest_run else None,
"last_run": latest_run.timestamp.isoformat() if latest_run else None,
"last_run_status": latest_run.status if latest_run else None,
-
# Collection stats
"total_test_files": collection.total_files if collection else 0,
"collected_tests": collection.total_tests if collection else 0,
@@ -380,8 +387,9 @@ class TestRunnerService:
"integration_tests": collection.integration_tests if collection else 0,
"performance_tests": collection.performance_tests if collection else 0,
"system_tests": collection.system_tests if collection else 0,
- "last_collected": collection.collected_at.isoformat() if collection else None,
-
+ "last_collected": collection.collected_at.isoformat()
+ if collection
+ else None,
# Trend data
"trend": [
{
@@ -394,10 +402,8 @@ class TestRunnerService:
}
for run in reversed(trend_runs)
],
-
# By category
"by_category": by_category,
-
# Top failing tests
"top_failing": [
{
@@ -417,16 +423,20 @@ class TestRunnerService:
try:
# Run pytest --collect-only with JSON report
- with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
+ with tempfile.NamedTemporaryFile(
+ mode="w", suffix=".json", delete=False
+ ) as f:
json_report_path = f.name
result = subprocess.run(
[
- "python", "-m", "pytest",
+ "python",
+ "-m",
+ "pytest",
"--collect-only",
"--json-report",
f"--json-report-file={json_report_path}",
- "tests"
+ "tests",
],
cwd=str(self.project_root),
capture_output=True,
@@ -461,11 +471,17 @@ class TestRunnerService:
if "/unit/" in file_path or file_path.startswith("tests/unit"):
collection.unit_tests += count
- elif "/integration/" in file_path or file_path.startswith("tests/integration"):
+ elif "/integration/" in file_path or file_path.startswith(
+ "tests/integration"
+ ):
collection.integration_tests += count
- elif "/performance/" in file_path or file_path.startswith("tests/performance"):
+ elif "/performance/" in file_path or file_path.startswith(
+ "tests/performance"
+ ):
collection.performance_tests += count
- elif "/system/" in file_path or file_path.startswith("tests/system"):
+ elif "/system/" in file_path or file_path.startswith(
+ "tests/system"
+ ):
collection.system_tests += count
collection.test_files = [
@@ -476,7 +492,9 @@ class TestRunnerService:
# Cleanup
json_path.unlink(missing_ok=True)
- logger.info(f"Collected {collection.total_tests} tests from {collection.total_files} files")
+ logger.info(
+ f"Collected {collection.total_tests} tests from {collection.total_files} files"
+ )
except Exception as e:
logger.error(f"Error collecting tests: {e}", exc_info=True)
diff --git a/app/services/vendor_product_service.py b/app/services/vendor_product_service.py
index 87b33761..6c07e22f 100644
--- a/app/services/vendor_product_service.py
+++ b/app/services/vendor_product_service.py
@@ -66,10 +66,7 @@ class VendorProductService:
total = query.count()
products = (
- query.order_by(Product.updated_at.desc())
- .offset(skip)
- .limit(limit)
- .all()
+ query.order_by(Product.updated_at.desc()).offset(skip).limit(limit).all()
)
result = []
@@ -138,8 +135,7 @@ class VendorProductService:
.all()
)
return [
- {"id": v.id, "name": v.name, "vendor_code": v.vendor_code}
- for v in vendors
+ {"id": v.id, "name": v.name, "vendor_code": v.vendor_code} for v in vendors
]
def get_product_detail(self, db: Session, product_id: int) -> dict:
@@ -212,8 +208,12 @@ class VendorProductService:
"marketplace_translations": mp_translations,
"vendor_translations": vendor_translations,
# Timestamps
- "created_at": product.created_at.isoformat() if product.created_at else None,
- "updated_at": product.updated_at.isoformat() if product.updated_at else None,
+ "created_at": product.created_at.isoformat()
+ if product.created_at
+ else None,
+ "updated_at": product.updated_at.isoformat()
+ if product.updated_at
+ else None,
}
def remove_product(self, db: Session, product_id: int) -> dict:
@@ -257,8 +257,12 @@ class VendorProductService:
"image_url": product.effective_primary_image_url,
"source_marketplace": mp.marketplace if mp else None,
"source_vendor": mp.vendor_name if mp else None,
- "created_at": product.created_at.isoformat() if product.created_at else None,
- "updated_at": product.updated_at.isoformat() if product.updated_at else None,
+ "created_at": product.created_at.isoformat()
+ if product.created_at
+ else None,
+ "updated_at": product.updated_at.isoformat()
+ if product.updated_at
+ else None,
}
diff --git a/app/services/vendor_service.py b/app/services/vendor_service.py
index 42bdf1a6..754647f1 100644
--- a/app/services/vendor_service.py
+++ b/app/services/vendor_service.py
@@ -67,20 +67,26 @@ class VendorService:
try:
# Validate company_id is provided
- if not hasattr(vendor_data, 'company_id') or not vendor_data.company_id:
+ if not hasattr(vendor_data, "company_id") or not vendor_data.company_id:
raise InvalidVendorDataException(
"company_id is required to create a vendor", field="company_id"
)
# Get company and verify ownership
- company = db.query(Company).filter(Company.id == vendor_data.company_id).first()
+ company = (
+ db.query(Company).filter(Company.id == vendor_data.company_id).first()
+ )
if not company:
raise InvalidVendorDataException(
- f"Company with ID {vendor_data.company_id} not found", field="company_id"
+ f"Company with ID {vendor_data.company_id} not found",
+ field="company_id",
)
# Check if user is company owner or admin
- if current_user.role != "admin" and company.owner_user_id != current_user.id:
+ if (
+ current_user.role != "admin"
+ and company.owner_user_id != current_user.id
+ ):
raise UnauthorizedVendorAccessException(
f"company-{vendor_data.company_id}", current_user.id
)
@@ -163,9 +169,7 @@ class VendorService:
)
query = query.filter(
(Vendor.is_active == True)
- & (
- (Vendor.is_verified == True) | (Vendor.id.in_(owned_vendor_ids))
- )
+ & ((Vendor.is_verified == True) | (Vendor.id.in_(owned_vendor_ids)))
)
else:
# Admin can apply filters
@@ -238,6 +242,7 @@ class VendorService:
VendorNotFoundException: If vendor not found
"""
from sqlalchemy.orm import joinedload
+
from models.database.company import Company
vendor = (
@@ -272,6 +277,7 @@ class VendorService:
VendorNotFoundException: If vendor not found or inactive
"""
from sqlalchemy.orm import joinedload
+
from models.database.company import Company
vendor = (
@@ -305,6 +311,7 @@ class VendorService:
VendorNotFoundException: If vendor not found
"""
from sqlalchemy.orm import joinedload
+
from models.database.company import Company
# Try as integer ID first
diff --git a/app/tasks/test_runner_tasks.py b/app/tasks/test_runner_tasks.py
index cd626b7e..63afd900 100644
--- a/app/tasks/test_runner_tasks.py
+++ b/app/tasks/test_runner_tasks.py
@@ -2,7 +2,6 @@
"""Background tasks for test runner."""
import logging
-from datetime import UTC, datetime
from app.core.database import SessionLocal
from app.services.test_runner_service import test_runner_service
diff --git a/app/templates/shared/macros/language_selector.html b/app/templates/shared/macros/language_selector.html
index 8449b637..26b0f1c1 100644
--- a/app/templates/shared/macros/language_selector.html
+++ b/app/templates/shared/macros/language_selector.html
@@ -58,7 +58,7 @@
{% set positions = {'left': 'left-0', 'right': 'right-0'} %}
{# Uses languageSelector() function per LANG-002 architecture rule #}
diff --git a/app/templates/shop/base.html b/app/templates/shop/base.html
index 72be6bff..445b1f53 100644
--- a/app/templates/shop/base.html
+++ b/app/templates/shop/base.html
@@ -136,7 +136,7 @@
{# Language Selector #}
{% set enabled_langs = vendor.storefront_languages if vendor and vendor.storefront_languages else ['fr', 'de', 'en'] %}
{% if enabled_langs|length > 1 %}
-
+
dict[str, Any]:
+ def _extract_translation_data(self, product_data: dict[str, Any]) -> dict[str, Any]:
"""Extract translation fields from product data.
Returns a dict with title, description, etc. that belong
@@ -363,11 +363,22 @@ class CSVProcessor:
if row_data:
# Keep only key fields for review
limited_data = {
- k: v for k, v in row_data.items()
- if k in [
- "marketplace_product_id", "title", "gtin", "mpn", "sku",
- "brand", "price", "availability", "link"
- ] and v is not None and str(v).strip()
+ k: v
+ for k, v in row_data.items()
+ if k
+ in [
+ "marketplace_product_id",
+ "title",
+ "gtin",
+ "mpn",
+ "sku",
+ "brand",
+ "price",
+ "availability",
+ "link",
+ ]
+ and v is not None
+ and str(v).strip()
}
row_data = limited_data if limited_data else None
@@ -419,7 +430,11 @@ class CSVProcessor:
product_data["vendor_name"] = vendor_name
# Get identifier for error tracking
- identifier = product_data.get("marketplace_product_id") or product_data.get("gtin") or product_data.get("mpn")
+ identifier = (
+ product_data.get("marketplace_product_id")
+ or product_data.get("gtin")
+ or product_data.get("mpn")
+ )
# Validate required fields
if not product_data.get("marketplace_product_id"):
@@ -524,7 +539,8 @@ class CSVProcessor:
row_number=row_number,
error_type="processing_error",
error_message=str(e),
- identifier=row_dict.get("marketplace_product_id") or row_dict.get("id"),
+ identifier=row_dict.get("marketplace_product_id")
+ or row_dict.get("id"),
row_data=row_dict,
)
errors += 1
diff --git a/app/utils/data_processing.py b/app/utils/data_processing.py
index 278e5a83..71c4986c 100644
--- a/app/utils/data_processing.py
+++ b/app/utils/data_processing.py
@@ -54,16 +54,18 @@ class GTINProcessor:
# Standard lengths - return as-is (already valid)
return gtin_clean
- elif length > 14:
+ if length > 14:
# Too long - truncate to EAN-13
logger.debug(f"GTIN too long ({length} digits), truncating: {gtin_clean}")
return gtin_clean[-13:]
- elif 0 < length < 14:
+ if 0 < length < 14:
# Non-standard length - pad to EAN-13 (European standard)
# EAN-13 is the international standard used in Europe and most of the world
# UPC-A (12 digits) is primarily US/Canada
- logger.debug(f"GTIN non-standard ({length} digits), padding to EAN-13: {gtin_clean}")
+ logger.debug(
+ f"GTIN non-standard ({length} digits), padding to EAN-13: {gtin_clean}"
+ )
return gtin_clean.zfill(13)
logger.warning(f"Invalid GTIN format: '{gtin_value}'")
diff --git a/app/utils/encryption.py b/app/utils/encryption.py
index d27635ba..b1539b4e 100644
--- a/app/utils/encryption.py
+++ b/app/utils/encryption.py
@@ -25,8 +25,6 @@ _ENCRYPTION_SALT = b"wizamart_encryption_salt_v1"
class EncryptionError(Exception):
"""Raised when encryption or decryption fails."""
- pass
-
class EncryptionService:
"""
diff --git a/app/utils/i18n.py b/app/utils/i18n.py
index 48f0adb6..555f1316 100644
--- a/app/utils/i18n.py
+++ b/app/utils/i18n.py
@@ -14,6 +14,7 @@ Supported languages:
- de: German
- lb: Luxembourgish
"""
+
import json
import logging
from functools import lru_cache
@@ -97,7 +98,7 @@ def load_translations(language: str) -> dict:
return {}
try:
- with open(locale_file, "r", encoding="utf-8") as f:
+ with open(locale_file, encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in translation file {locale_file}: {e}")
@@ -139,7 +140,11 @@ def get_nested_value(data: dict, key_path: str, default: str = None) -> str:
else:
return default if default is not None else key_path
- return value if isinstance(value, str) else (default if default is not None else key_path)
+ return (
+ value
+ if isinstance(value, str)
+ else (default if default is not None else key_path)
+ )
def translate(
@@ -180,7 +185,9 @@ def translate(
try:
text = text.format(**kwargs)
except KeyError as e:
- logger.warning(f"Missing interpolation variable in translation '{key}': {e}")
+ logger.warning(
+ f"Missing interpolation variable in translation '{key}': {e}"
+ )
return text
diff --git a/middleware/language.py b/middleware/language.py
index 594f7766..9b5421a0 100644
--- a/middleware/language.py
+++ b/middleware/language.py
@@ -11,6 +11,7 @@ This middleware detects the appropriate language for each request based on:
The resolved language is stored in request.state.language for use in templates.
"""
+
import logging
from starlette.middleware.base import BaseHTTPMiddleware
diff --git a/models/database/__init__.py b/models/database/__init__.py
index 4704f101..6f7467fb 100644
--- a/models/database/__init__.py
+++ b/models/database/__init__.py
@@ -14,12 +14,17 @@ from .architecture_scan import (
ViolationAssignment,
ViolationComment,
)
-from .test_run import TestCollection, TestResult, TestRun
from .base import Base
from .company import Company
from .content_page import ContentPage
from .customer import Customer, CustomerAddress
from .inventory import Inventory
+from .letzshop import (
+ LetzshopFulfillmentQueue,
+ LetzshopOrder,
+ LetzshopSyncLog,
+ VendorLetzshopCredentials,
+)
from .marketplace_import_job import MarketplaceImportError, MarketplaceImportJob
from .marketplace_product import (
DigitalDeliveryMethod,
@@ -28,14 +33,9 @@ from .marketplace_product import (
)
from .marketplace_product_translation import MarketplaceProductTranslation
from .order import Order, OrderItem
-from .letzshop import (
- LetzshopFulfillmentQueue,
- LetzshopOrder,
- LetzshopSyncLog,
- VendorLetzshopCredentials,
-)
from .product import Product
from .product_translation import ProductTranslation
+from .test_run import TestCollection, TestResult, TestRun
from .user import User
from .vendor import Role, Vendor, VendorUser
from .vendor_domain import VendorDomain
diff --git a/models/database/letzshop.py b/models/database/letzshop.py
index b8862864..af9f4019 100644
--- a/models/database/letzshop.py
+++ b/models/database/letzshop.py
@@ -89,7 +89,9 @@ class LetzshopOrder(Base, TimestampMixin):
customer_name = Column(String(255), nullable=True)
# Order totals from Letzshop
- total_amount = Column(String(50), nullable=True) # Store as string to preserve format
+ total_amount = Column(
+ String(50), nullable=True
+ ) # Store as string to preserve format
currency = Column(String(10), default="EUR")
# Raw data storage (for debugging/auditing)
diff --git a/models/database/marketplace_import_job.py b/models/database/marketplace_import_job.py
index 3f6d3c2f..09faa0d3 100644
--- a/models/database/marketplace_import_job.py
+++ b/models/database/marketplace_import_job.py
@@ -33,7 +33,9 @@ class MarketplaceImportError(Base, TimestampMixin):
identifier = Column(String) # marketplace_product_id, gtin, mpn, etc.
# Error details
- error_type = Column(String(50), nullable=False) # missing_title, missing_id, parse_error, etc.
+ error_type = Column(
+ String(50), nullable=False
+ ) # missing_title, missing_id, parse_error, etc.
error_message = Column(Text, nullable=False)
# Raw row data for review (JSON)
@@ -64,7 +66,9 @@ class MarketplaceImportJob(Base, TimestampMixin):
# Import configuration
marketplace = Column(String, nullable=False, index=True, default="Letzshop")
source_url = Column(String, nullable=False)
- language = Column(String(5), nullable=False, default="en") # Language for translations
+ language = Column(
+ String(5), nullable=False, default="en"
+ ) # Language for translations
# Status tracking
status = Column(
diff --git a/models/database/marketplace_product.py b/models/database/marketplace_product.py
index b55c4076..5e558188 100644
--- a/models/database/marketplace_product.py
+++ b/models/database/marketplace_product.py
@@ -17,7 +17,6 @@ from sqlalchemy import (
Index,
Integer,
String,
- Text,
)
from sqlalchemy.dialects.sqlite import JSON
from sqlalchemy.orm import relationship
diff --git a/models/database/product_translation.py b/models/database/product_translation.py
index d361e6ae..3897b8eb 100644
--- a/models/database/product_translation.py
+++ b/models/database/product_translation.py
@@ -154,7 +154,9 @@ class ProductTranslation(Base, TimestampMixin):
# Description
"description": self.get_effective_description(),
"description_overridden": self.description is not None,
- "description_source": mp_translation.description if mp_translation else None,
+ "description_source": mp_translation.description
+ if mp_translation
+ else None,
# Short Description
"short_description": self.get_effective_short_description(),
"short_description_overridden": self.short_description is not None,
diff --git a/models/database/test_run.py b/models/database/test_run.py
index 5b15ae79..27ce888a 100644
--- a/models/database/test_run.py
+++ b/models/database/test_run.py
@@ -5,7 +5,6 @@ Database models for tracking pytest test runs and results
from sqlalchemy import (
JSON,
- Boolean,
Column,
DateTime,
Float,
@@ -53,7 +52,9 @@ class TestRun(Base):
pytest_args = Column(String(500)) # Command line arguments used
# Status
- status = Column(String(20), default="running", index=True) # 'running', 'passed', 'failed', 'error'
+ status = Column(
+ String(20), default="running", index=True
+ ) # 'running', 'passed', 'failed', 'error'
# Relationship to test results
results = relationship(
@@ -77,18 +78,20 @@ class TestResult(Base):
__tablename__ = "test_results"
id = Column(Integer, primary_key=True, index=True)
- run_id = Column(
- Integer, ForeignKey("test_runs.id"), nullable=False, index=True
- )
+ run_id = Column(Integer, ForeignKey("test_runs.id"), nullable=False, index=True)
# Test identification
- node_id = Column(String(500), nullable=False, index=True) # e.g., 'tests/unit/test_foo.py::test_bar'
+ node_id = Column(
+ String(500), nullable=False, index=True
+ ) # e.g., 'tests/unit/test_foo.py::test_bar'
test_name = Column(String(200), nullable=False) # e.g., 'test_bar'
test_file = Column(String(300), nullable=False) # e.g., 'tests/unit/test_foo.py'
test_class = Column(String(200)) # e.g., 'TestFooClass' (optional)
# Result
- outcome = Column(String(20), nullable=False, index=True) # 'passed', 'failed', 'error', 'skipped', 'xfailed', 'xpassed'
+ outcome = Column(
+ String(20), nullable=False, index=True
+ ) # 'passed', 'failed', 'error', 'skipped', 'xfailed', 'xpassed'
duration_seconds = Column(Float, default=0.0)
# Failure details (if applicable)
diff --git a/models/database/vendor.py b/models/database/vendor.py
index fe775bf8..3f6f83db 100644
--- a/models/database/vendor.py
+++ b/models/database/vendor.py
@@ -47,7 +47,9 @@ class Vendor(Base, TimestampMixin):
subdomain = Column(
String(100), unique=True, nullable=False, index=True
) # Unique, non-nullable subdomain column with indexing
- name = Column(String, nullable=False) # Non-nullable name column for the vendor (brand name)
+ name = Column(
+ String, nullable=False
+ ) # Non-nullable name column for the vendor (brand name)
description = Column(Text) # Optional text description column for the vendor
# Letzshop URLs - multi-language support (brand-specific marketplace feeds)
@@ -287,13 +289,16 @@ class Vendor(Base, TimestampMixin):
company = self.company
return {
"contact_email": self.effective_contact_email,
- "contact_email_inherited": self.contact_email is None and company is not None,
+ "contact_email_inherited": self.contact_email is None
+ and company is not None,
"contact_phone": self.effective_contact_phone,
- "contact_phone_inherited": self.contact_phone is None and company is not None,
+ "contact_phone_inherited": self.contact_phone is None
+ and company is not None,
"website": self.effective_website,
"website_inherited": self.website is None and company is not None,
"business_address": self.effective_business_address,
- "business_address_inherited": self.business_address is None and company is not None,
+ "business_address_inherited": self.business_address is None
+ and company is not None,
"tax_number": self.effective_tax_number,
"tax_number_inherited": self.tax_number is None and company is not None,
}
diff --git a/models/schema/admin.py b/models/schema/admin.py
index 1ff69428..4f9b3586 100644
--- a/models/schema/admin.py
+++ b/models/schema/admin.py
@@ -489,10 +489,18 @@ class LogSettingsResponse(BaseModel):
class LogSettingsUpdate(BaseModel):
"""Update log settings."""
- log_level: str | None = Field(None, description="Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL")
- log_file_max_size_mb: int | None = Field(None, ge=1, le=1000, description="Max log file size in MB")
- log_file_backup_count: int | None = Field(None, ge=0, le=50, description="Number of backup files to keep")
- db_log_retention_days: int | None = Field(None, ge=1, le=365, description="Days to retain logs in database")
+ log_level: str | None = Field(
+ None, description="Log level: DEBUG, INFO, WARNING, ERROR, CRITICAL"
+ )
+ log_file_max_size_mb: int | None = Field(
+ None, ge=1, le=1000, description="Max log file size in MB"
+ )
+ log_file_backup_count: int | None = Field(
+ None, ge=0, le=50, description="Number of backup files to keep"
+ )
+ db_log_retention_days: int | None = Field(
+ None, ge=1, le=365, description="Days to retain logs in database"
+ )
@field_validator("log_level")
@classmethod
diff --git a/models/schema/letzshop.py b/models/schema/letzshop.py
index f795ba03..78c952d2 100644
--- a/models/schema/letzshop.py
+++ b/models/schema/letzshop.py
@@ -12,8 +12,7 @@ Covers:
from datetime import datetime
from typing import Any
-from pydantic import BaseModel, ConfigDict, Field, field_validator
-
+from pydantic import BaseModel, ConfigDict, Field
# ============================================================================
# Credentials Schemas
@@ -28,9 +27,7 @@ class LetzshopCredentialsCreate(BaseModel):
None,
description="Custom API endpoint (defaults to https://letzshop.lu/graphql)",
)
- auto_sync_enabled: bool = Field(
- False, description="Enable automatic order sync"
- )
+ auto_sync_enabled: bool = Field(False, description="Enable automatic order sync")
sync_interval_minutes: int = Field(
15, ge=5, le=1440, description="Sync interval in minutes (5-1440)"
)
diff --git a/models/schema/marketplace_product.py b/models/schema/marketplace_product.py
index 8d195a0e..576e465e 100644
--- a/models/schema/marketplace_product.py
+++ b/models/schema/marketplace_product.py
@@ -73,7 +73,9 @@ class MarketplaceProductBase(BaseModel):
# Categories
google_product_category: str | None = None
- product_type_raw: str | None = None # Original feed value (renamed from product_type)
+ product_type_raw: str | None = (
+ None # Original feed value (renamed from product_type)
+ )
category_path: str | None = None
# Custom labels
@@ -95,7 +97,9 @@ class MarketplaceProductBase(BaseModel):
source_url: str | None = None
# Product type classification
- product_type_enum: str | None = None # 'physical', 'digital', 'service', 'subscription'
+ product_type_enum: str | None = (
+ None # 'physical', 'digital', 'service', 'subscription'
+ )
is_digital: bool | None = None
# Digital product fields
@@ -124,8 +128,6 @@ class MarketplaceProductUpdate(MarketplaceProductBase):
All fields are optional - only provided fields will be updated.
"""
- pass
-
class MarketplaceProductResponse(BaseModel):
"""Schema for marketplace product API response."""
diff --git a/models/schema/media.py b/models/schema/media.py
index 16ad1f7d..c15c899f 100644
--- a/models/schema/media.py
+++ b/models/schema/media.py
@@ -14,7 +14,6 @@ from typing import Any
from pydantic import BaseModel, Field
-
# ============================================================================
# SHARED RESPONSE SCHEMAS
# ============================================================================
diff --git a/models/schema/notification.py b/models/schema/notification.py
index 63ed778b..8a920006 100644
--- a/models/schema/notification.py
+++ b/models/schema/notification.py
@@ -14,7 +14,6 @@ from typing import Any
from pydantic import BaseModel, Field
-
# ============================================================================
# SHARED RESPONSE SCHEMAS
# ============================================================================
@@ -134,7 +133,9 @@ class TestNotificationRequest(BaseModel):
template_id: int | None = Field(None, description="Template to use")
email: str | None = Field(None, description="Override recipient email")
- notification_type: str = Field(default="test", description="Type of notification to send")
+ notification_type: str = Field(
+ default="test", description="Type of notification to send"
+ )
# ============================================================================
diff --git a/models/schema/payment.py b/models/schema/payment.py
index cf7084f6..b9ac30b7 100644
--- a/models/schema/payment.py
+++ b/models/schema/payment.py
@@ -15,7 +15,6 @@ from typing import Any
from pydantic import BaseModel, Field
-
# ============================================================================
# PAYMENT CONFIGURATION SCHEMAS
# ============================================================================
@@ -152,7 +151,9 @@ class PaymentBalanceResponse(BaseModel):
class RefundRequest(BaseModel):
"""Request model for processing a refund."""
- amount: float | None = Field(None, gt=0, description="Partial refund amount, or None for full refund")
+ amount: float | None = Field(
+ None, gt=0, description="Partial refund amount, or None for full refund"
+ )
reason: str | None = Field(None, max_length=500)
diff --git a/models/schema/vendor.py b/models/schema/vendor.py
index 02c1d1b4..8f9e4bbf 100644
--- a/models/schema/vendor.py
+++ b/models/schema/vendor.py
@@ -43,7 +43,10 @@ class VendorCreate(BaseModel):
..., description="Unique subdomain for the vendor", min_length=2, max_length=100
)
name: str = Field(
- ..., description="Display name of the vendor/brand", min_length=2, max_length=255
+ ...,
+ description="Display name of the vendor/brand",
+ min_length=2,
+ max_length=255,
)
description: str | None = Field(None, description="Vendor/brand description")
@@ -53,10 +56,16 @@ class VendorCreate(BaseModel):
letzshop_csv_url_de: str | None = Field(None, description="German CSV URL")
# Contact Info (optional - if not provided, inherited from company)
- contact_email: str | None = Field(None, description="Override company contact email")
- contact_phone: str | None = Field(None, description="Override company contact phone")
+ contact_email: str | None = Field(
+ None, description="Override company contact email"
+ )
+ contact_phone: str | None = Field(
+ None, description="Override company contact phone"
+ )
website: str | None = Field(None, description="Override company website")
- business_address: str | None = Field(None, description="Override company business address")
+ business_address: str | None = Field(
+ None, description="Override company business address"
+ )
tax_number: str | None = Field(None, description="Override company tax number")
# Language Settings
@@ -113,10 +122,16 @@ class VendorUpdate(BaseModel):
is_verified: bool | None = None
# Contact Info (set value to override, set to empty string to reset to inherit)
- contact_email: str | None = Field(None, description="Override company contact email")
- contact_phone: str | None = Field(None, description="Override company contact phone")
+ contact_email: str | None = Field(
+ None, description="Override company contact email"
+ )
+ contact_phone: str | None = Field(
+ None, description="Override company contact phone"
+ )
website: str | None = Field(None, description="Override company website")
- business_address: str | None = Field(None, description="Override company business address")
+ business_address: str | None = Field(
+ None, description="Override company business address"
+ )
tax_number: str | None = Field(None, description="Override company tax number")
# Special flag to reset contact fields to inherit from company
@@ -212,17 +227,33 @@ class VendorDetailResponse(VendorResponse):
tax_number: str | None = Field(None, description="Effective tax number")
# Inheritance flags (True = value is inherited from company, not overridden)
- contact_email_inherited: bool = Field(False, description="True if contact_email is from company")
- contact_phone_inherited: bool = Field(False, description="True if contact_phone is from company")
- website_inherited: bool = Field(False, description="True if website is from company")
- business_address_inherited: bool = Field(False, description="True if business_address is from company")
- tax_number_inherited: bool = Field(False, description="True if tax_number is from company")
+ contact_email_inherited: bool = Field(
+ False, description="True if contact_email is from company"
+ )
+ contact_phone_inherited: bool = Field(
+ False, description="True if contact_phone is from company"
+ )
+ website_inherited: bool = Field(
+ False, description="True if website is from company"
+ )
+ business_address_inherited: bool = Field(
+ False, description="True if business_address is from company"
+ )
+ tax_number_inherited: bool = Field(
+ False, description="True if tax_number is from company"
+ )
# Original company values (for reference in UI)
- company_contact_email: str | None = Field(None, description="Company's contact email")
- company_contact_phone: str | None = Field(None, description="Company's phone number")
+ company_contact_email: str | None = Field(
+ None, description="Company's contact email"
+ )
+ company_contact_phone: str | None = Field(
+ None, description="Company's phone number"
+ )
company_website: str | None = Field(None, description="Company's website URL")
- company_business_address: str | None = Field(None, description="Company's business address")
+ company_business_address: str | None = Field(
+ None, description="Company's business address"
+ )
company_tax_number: str | None = Field(None, description="Company's tax number")
diff --git a/scripts/init_log_settings.py b/scripts/init_log_settings.py
index c1f0fc12..5c557fde 100644
--- a/scripts/init_log_settings.py
+++ b/scripts/init_log_settings.py
@@ -84,13 +84,17 @@ def init_log_settings():
)
if existing:
- print(f"✓ Setting '{setting_data['key']}' already exists (value: {existing.value})")
+ print(
+ f"✓ Setting '{setting_data['key']}' already exists (value: {existing.value})"
+ )
updated_count += 1
else:
setting = AdminSetting(**setting_data)
db.add(setting)
created_count += 1
- print(f"✓ Created setting '{setting_data['key']}' = {setting_data['value']}")
+ print(
+ f"✓ Created setting '{setting_data['key']}' = {setting_data['value']}"
+ )
db.commit()
diff --git a/scripts/seed_demo.py b/scripts/seed_demo.py
index 689821c8..a42fa291 100644
--- a/scripts/seed_demo.py
+++ b/scripts/seed_demo.py
@@ -30,7 +30,6 @@ Environment Variables:
This script is idempotent when run normally.
"""
-import argparse
import sys
from datetime import UTC, datetime
from decimal import Decimal
@@ -57,7 +56,9 @@ from models.database.company import Company
from models.database.customer import Customer, CustomerAddress
from models.database.marketplace_import_job import MarketplaceImportJob
from models.database.marketplace_product import MarketplaceProduct
-from models.database.marketplace_product_translation import MarketplaceProductTranslation
+from models.database.marketplace_product_translation import (
+ MarketplaceProductTranslation,
+)
from models.database.order import Order, OrderItem
from models.database.product import Product
from models.database.user import User
@@ -263,7 +264,9 @@ def reset_all_data(db: Session):
sys.exit(0)
except EOFError:
print_error("No interactive terminal available.")
- print(" Use FORCE_RESET=true to skip confirmation in non-interactive mode.")
+ print(
+ " Use FORCE_RESET=true to skip confirmation in non-interactive mode."
+ )
sys.exit(1)
# Delete in correct order (respecting foreign keys)
@@ -367,9 +370,7 @@ def create_demo_companies(db: Session, auth_manager: AuthManager) -> list[Compan
db.flush()
companies.append(company)
- print_success(
- f"Created company: {company.name} (Owner: {owner_user.email})"
- )
+ print_success(f"Created company: {company.name} (Owner: {owner_user.email})")
db.flush()
return companies
@@ -456,7 +457,9 @@ def create_demo_vendors(
if vendor_data.get("custom_domain"):
domain = VendorDomain(
vendor_id=vendor.id,
- domain=vendor_data["custom_domain"], # ✅ Field is 'domain', not 'domain_name'
+ domain=vendor_data[
+ "custom_domain"
+ ], # ✅ Field is 'domain', not 'domain_name'
is_verified=True, # Auto-verified for demo
is_primary=True,
verification_token=None,
@@ -695,7 +698,7 @@ def print_summary(db: Session):
print(f" Vendors: {len(company.vendors) if company.vendors else 0}")
print(f" Status: {'✓ Active' if company.is_active else '✗ Inactive'}")
if company.is_verified:
- print(f" Verified: ✓")
+ print(" Verified: ✓")
# Show vendor details
vendors = db.query(Vendor).all()
diff --git a/scripts/test_logging_system.py b/scripts/test_logging_system.py
index d6f30306..46a96c00 100644
--- a/scripts/test_logging_system.py
+++ b/scripts/test_logging_system.py
@@ -18,6 +18,7 @@ from pathlib import Path
project_root = Path(__file__).parent.parent
sys.path.insert(0, str(project_root))
+
def test_logging_endpoints():
"""Test logging-related API endpoints."""
print("\n" + "=" * 70)
@@ -34,7 +35,7 @@ def test_logging_endpoints():
# Create an exception log
try:
raise ValueError("Test exception for logging")
- except Exception as e:
+ except Exception:
logging.error("Test exception logging", exc_info=True)
print(" ✓ Test logs created")
@@ -69,9 +70,12 @@ def test_logging_endpoints():
print(f" ✓ Database logs count: {count}")
if count > 0:
- recent = db.query(ApplicationLog).order_by(
- ApplicationLog.timestamp.desc()
- ).limit(5).all()
+ recent = (
+ db.query(ApplicationLog)
+ .order_by(ApplicationLog.timestamp.desc())
+ .limit(5)
+ .all()
+ )
print(" Recent logs:")
for log in recent:
@@ -90,9 +94,15 @@ def test_logging_endpoints():
db = SessionLocal()
try:
- log_level = admin_settings_service.get_setting_value(db, "log_level", "INFO")
- max_size = admin_settings_service.get_setting_value(db, "log_file_max_size_mb", 10)
- retention = admin_settings_service.get_setting_value(db, "db_log_retention_days", 30)
+ log_level = admin_settings_service.get_setting_value(
+ db, "log_level", "INFO"
+ )
+ max_size = admin_settings_service.get_setting_value(
+ db, "log_file_max_size_mb", 10
+ )
+ retention = admin_settings_service.get_setting_value(
+ db, "db_log_retention_days", 30
+ )
print(f" ✓ Log Level: {log_level}")
print(f" ✓ Max File Size: {max_size} MB")
@@ -118,6 +128,7 @@ def test_logging_endpoints():
if __name__ == "__main__":
# Set up logging first
from app.core.logging import setup_logging
+
setup_logging()
success = test_logging_endpoints()
diff --git a/scripts/validate_architecture.py b/scripts/validate_architecture.py
index 9dba6927..671b655d 100755
--- a/scripts/validate_architecture.py
+++ b/scripts/validate_architecture.py
@@ -30,7 +30,6 @@ Options:
"""
import argparse
-import ast
import re
import sys
from dataclasses import dataclass, field
@@ -79,7 +78,7 @@ class FileResult:
def status(self) -> str:
if self.errors > 0:
return "FAILED"
- elif self.warnings > 0:
+ if self.warnings > 0:
return "PASSED*"
return "PASSED"
@@ -87,7 +86,7 @@ class FileResult:
def status_icon(self) -> str:
if self.errors > 0:
return "❌"
- elif self.warnings > 0:
+ if self.warnings > 0:
return "⚠️"
return "✅"
@@ -224,9 +223,7 @@ class ArchitectureValidator:
return self.result
- def validate_file(
- self, file_path: Path, quiet: bool = False
- ) -> ValidationResult:
+ def validate_file(self, file_path: Path, quiet: bool = False) -> ValidationResult:
"""Validate a single file"""
if not file_path.exists():
if not quiet:
@@ -305,15 +302,17 @@ class ArchitectureValidator:
# Search patterns for different file types
patterns = []
for variant in variants:
- patterns.extend([
- f"app/api/**/*{variant}*.py",
- f"app/services/*{variant}*.py",
- f"app/exceptions/*{variant}*.py",
- f"models/database/*{variant}*.py",
- f"models/schema/*{variant}*.py",
- f"static/admin/js/*{variant}*.js",
- f"app/templates/admin/*{variant}*.html",
- ])
+ patterns.extend(
+ [
+ f"app/api/**/*{variant}*.py",
+ f"app/services/*{variant}*.py",
+ f"app/exceptions/*{variant}*.py",
+ f"models/database/*{variant}*.py",
+ f"models/schema/*{variant}*.py",
+ f"static/admin/js/*{variant}*.js",
+ f"app/templates/admin/*{variant}*.html",
+ ]
+ )
# Find all matching files
found_files: set[Path] = set()
@@ -503,7 +502,9 @@ class ArchitectureValidator:
suggestion="Replace window.showToast('msg', 'type') with Utils.showToast('msg', 'type')",
)
- def _check_alpine_data_spread(self, file_path: Path, content: str, lines: list[str]):
+ def _check_alpine_data_spread(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""JS-003: Check that Alpine components inherit base layout data using ...data()"""
# Skip utility/init files that aren't page components
skip_patterns = ["init-", "api-client", "log-config", "utils", "helpers"]
@@ -517,23 +518,24 @@ class ArchitectureValidator:
# Look for Alpine component function pattern: function adminXxx() { return { ... } }
# These are page-level components that should inherit from data()
component_pattern = re.compile(
- r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{",
- re.IGNORECASE
+ r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{", re.IGNORECASE
)
for match in component_pattern.finditer(content):
func_name = match.group(1)
func_start = match.start()
- line_num = content[:func_start].count('\n') + 1
+ line_num = content[:func_start].count("\n") + 1
# Find the return statement with object literal
# Look for "return {" within reasonable distance
- search_region = content[func_start:func_start + 500]
+ search_region = content[func_start : func_start + 500]
if "return {" in search_region:
# Check if ...data() is present in the return object
# Look for pattern like "return {\n ...data()," or similar
- return_match = re.search(r"return\s*\{([^}]{0,200})", search_region, re.DOTALL)
+ return_match = re.search(
+ r"return\s*\{([^}]{0,200})", search_region, re.DOTALL
+ )
if return_match:
return_content = return_match.group(1)
if "...data()" not in return_content:
@@ -548,7 +550,9 @@ class ArchitectureValidator:
suggestion="Add '...data(),' as first property in return object to inherit dark mode, sidebar state, etc.",
)
- def _check_fetch_vs_api_client(self, file_path: Path, content: str, lines: list[str]):
+ def _check_fetch_vs_api_client(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""JS-008: Check for raw fetch() calls that should use apiClient instead"""
# Skip init files and api-client itself
if "init-" in file_path.name or "api-client" in file_path.name:
@@ -581,7 +585,9 @@ class ArchitectureValidator:
suggestion="Replace fetch('/api/...') with apiClient.get('/endpoint') or apiClient.post('/endpoint', data)",
)
- def _check_alpine_current_page(self, file_path: Path, content: str, lines: list[str]):
+ def _check_alpine_current_page(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""JS-004: Check that Alpine components set currentPage identifier"""
# Skip utility/init files
skip_patterns = ["init-", "api-client", "log-config", "utils", "helpers"]
@@ -593,22 +599,26 @@ class ArchitectureValidator:
# Look for Alpine component function pattern
component_pattern = re.compile(
- r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{",
- re.IGNORECASE
+ r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{", re.IGNORECASE
)
for match in component_pattern.finditer(content):
func_name = match.group(1)
func_start = match.start()
- line_num = content[:func_start].count('\n') + 1
+ line_num = content[:func_start].count("\n") + 1
# Check if currentPage is set in the return object
- search_region = content[func_start:func_start + 800]
+ search_region = content[func_start : func_start + 800]
if "return {" in search_region:
- return_match = re.search(r"return\s*\{([^}]{0,500})", search_region, re.DOTALL)
+ return_match = re.search(
+ r"return\s*\{([^}]{0,500})", search_region, re.DOTALL
+ )
if return_match:
return_content = return_match.group(1)
- if "currentPage:" not in return_content and "currentPage :" not in return_content:
+ if (
+ "currentPage:" not in return_content
+ and "currentPage :" not in return_content
+ ):
self._add_violation(
rule_id="JS-004",
rule_name="Alpine components must set currentPage",
@@ -635,10 +645,10 @@ class ArchitectureValidator:
for match in init_pattern.finditer(content):
init_start = match.start()
- line_num = content[:init_start].count('\n') + 1
+ line_num = content[:init_start].count("\n") + 1
# Check next 200 chars for initialization guard pattern
- search_region = content[init_start:init_start + 300]
+ search_region = content[init_start : init_start + 300]
guard_patterns = [
"window._",
"if (this._initialized)",
@@ -660,7 +670,9 @@ class ArchitectureValidator:
)
return # Only report once per file
- def _check_async_error_handling(self, file_path: Path, content: str, lines: list[str]):
+ def _check_async_error_handling(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""JS-006: Check that async operations have try/catch error handling"""
# Skip utility/init files
skip_patterns = ["init-", "api-client", "log-config"]
@@ -675,14 +687,18 @@ class ArchitectureValidator:
for match in async_pattern.finditer(content):
func_start = match.start()
- line_num = content[:func_start].count('\n') + 1
+ line_num = content[:func_start].count("\n") + 1
# Find the function body (look for matching braces)
# Simplified: check next 500 chars for try/catch
- search_region = content[func_start:func_start + 800]
+ search_region = content[func_start : func_start + 800]
# Check if there's await without try/catch
- if "await " in search_region and "try {" not in search_region and "try{" not in search_region:
+ if (
+ "await " in search_region
+ and "try {" not in search_region
+ and "try{" not in search_region
+ ):
# Check if it's a simple one-liner or has error handling elsewhere
if ".catch(" not in search_region:
self._add_violation(
@@ -709,22 +725,28 @@ class ArchitectureValidator:
# Look for Alpine component functions that have async methods with API calls
component_pattern = re.compile(
- r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{",
- re.IGNORECASE
+ r"function\s+(admin\w+|vendor\w+|shop\w+)\s*\(\s*\)\s*\{", re.IGNORECASE
)
for match in component_pattern.finditer(content):
func_start = match.start()
# Get the component body (rough extraction)
- component_region = content[func_start:func_start + 5000]
+ component_region = content[func_start : func_start + 5000]
# Check if component has API calls but no loading state
- has_api_calls = "apiClient." in component_region or "await " in component_region
- has_loading_state = "loading:" in component_region or "loading :" in component_region
- has_loading_assignment = "this.loading = " in component_region or "loading = true" in component_region
+ has_api_calls = (
+ "apiClient." in component_region or "await " in component_region
+ )
+ has_loading_state = (
+ "loading:" in component_region or "loading :" in component_region
+ )
+ has_loading_assignment = (
+ "this.loading = " in component_region
+ or "loading = true" in component_region
+ )
if has_api_calls and not has_loading_state:
- line_num = content[:func_start].count('\n') + 1
+ line_num = content[:func_start].count("\n") + 1
self._add_violation(
rule_id="JS-007",
rule_name="Set loading state for async operations",
@@ -744,10 +766,14 @@ class ArchitectureValidator:
file_path_str = str(file_path)
# Skip base template and partials for extends check
- is_base_or_partial = "base.html" in file_path.name or "partials" in file_path_str
+ is_base_or_partial = (
+ "base.html" in file_path.name or "partials" in file_path_str
+ )
# Skip macros directory for FE rules
- is_macro = "shared/macros/" in file_path_str or "shared\\macros\\" in file_path_str
+ is_macro = (
+ "shared/macros/" in file_path_str or "shared\\macros\\" in file_path_str
+ )
# Skip components showcase page
is_components_page = "components.html" in file_path.name
@@ -798,27 +824,36 @@ class ArchitectureValidator:
# TPL-001: Admin templates extends check
if is_admin:
self._check_template_extends(
- file_path, lines, "admin/base.html", "TPL-001",
- ["login.html", "errors/", "test-"]
+ file_path,
+ lines,
+ "admin/base.html",
+ "TPL-001",
+ ["login.html", "errors/", "test-"],
)
# TPL-002: Vendor templates extends check
if is_vendor:
self._check_template_extends(
- file_path, lines, "vendor/base.html", "TPL-002",
- ["login.html", "errors/", "test-"]
+ file_path,
+ lines,
+ "vendor/base.html",
+ "TPL-002",
+ ["login.html", "errors/", "test-"],
)
# TPL-003: Shop templates extends check
if is_shop:
self._check_template_extends(
- file_path, lines, "shop/base.html", "TPL-003",
- ["errors/", "test-"]
+ file_path, lines, "shop/base.html", "TPL-003", ["errors/", "test-"]
)
def _check_template_extends(
- self, file_path: Path, lines: list[str],
- base_template: str, rule_id: str, exclusions: list[str]
+ self,
+ file_path: Path,
+ lines: list[str],
+ base_template: str,
+ rule_id: str,
+ exclusions: list[str],
):
"""Check that template extends the appropriate base template"""
file_path_str = str(file_path)
@@ -855,9 +890,12 @@ class ArchitectureValidator:
# Look for potentially unsafe patterns where user data might be interpolated
# This is a heuristic check - not perfect
unsafe_patterns = [
- (r'\{\{\s*\w+\.name\s*\}\}', "User data interpolation without x-text"),
- (r'\{\{\s*\w+\.title\s*\}\}', "User data interpolation without x-text"),
- (r'\{\{\s*\w+\.description\s*\}\}', "User data interpolation without x-text"),
+ (r"\{\{\s*\w+\.name\s*\}\}", "User data interpolation without x-text"),
+ (r"\{\{\s*\w+\.title\s*\}\}", "User data interpolation without x-text"),
+ (
+ r"\{\{\s*\w+\.description\s*\}\}",
+ "User data interpolation without x-text",
+ ),
]
for i, line in enumerate(lines, 1):
@@ -895,13 +933,21 @@ class ArchitectureValidator:
for i, line in enumerate(lines, 1):
if "x-html=" in line:
# Check if it matches safe patterns
- is_safe = any(re.search(pattern, line) for pattern in safe_xhtml_patterns)
+ is_safe = any(
+ re.search(pattern, line) for pattern in safe_xhtml_patterns
+ )
if not is_safe:
# Check for potentially unsafe user content
unsafe_indicators = [
- ".description", ".content", ".body", ".text",
- ".message", ".comment", ".review", ".html"
+ ".description",
+ ".content",
+ ".body",
+ ".text",
+ ".message",
+ ".comment",
+ ".review",
+ ".html",
]
for indicator in unsafe_indicators:
if indicator in line:
@@ -917,33 +963,39 @@ class ArchitectureValidator:
)
return
- def _check_template_loading_state(self, file_path: Path, content: str, lines: list[str]):
+ def _check_template_loading_state(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""TPL-006: Check that templates with data loads show loading state"""
if "noqa: tpl-006" in content.lower():
return
# Check if template has data loading (Alpine init, fetch, apiClient)
- has_data_loading = any([
- "x-init=" in content,
- "@load=" in content,
- "apiClient" in content,
- "loadData" in content,
- "fetchData" in content,
- ])
+ has_data_loading = any(
+ [
+ "x-init=" in content,
+ "@load=" in content,
+ "apiClient" in content,
+ "loadData" in content,
+ "fetchData" in content,
+ ]
+ )
if not has_data_loading:
return
# Check for loading state display
- has_loading_state = any([
- 'x-show="loading"' in content,
- "x-show='loading'" in content,
- 'x-if="loading"' in content,
- "x-if='loading'" in content,
- 'loading_state' in content,
- 'Loading...' in content,
- 'spinner' in content.lower(),
- ])
+ has_loading_state = any(
+ [
+ 'x-show="loading"' in content,
+ "x-show='loading'" in content,
+ 'x-if="loading"' in content,
+ "x-if='loading'" in content,
+ "loading_state" in content,
+ "Loading..." in content,
+ "spinner" in content.lower(),
+ ]
+ )
if not has_loading_state:
self._add_violation(
@@ -957,31 +1009,37 @@ class ArchitectureValidator:
suggestion='Add Loading...
or use loading_state macro',
)
- def _check_template_empty_state(self, file_path: Path, content: str, lines: list[str]):
+ def _check_template_empty_state(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""TPL-007: Check that templates with lists show empty state"""
if "noqa: tpl-007" in content.lower():
return
# Check if template has list iteration
- has_list = any([
- "x-for=" in content,
- "{% for " in content,
- ])
+ has_list = any(
+ [
+ "x-for=" in content,
+ "{% for " in content,
+ ]
+ )
if not has_list:
return
# Check for empty state handling
- has_empty_state = any([
- ".length === 0" in content,
- ".length == 0" in content,
- "items.length" in content,
- "empty_state" in content,
- "No items" in content,
- "No results" in content,
- "No data" in content,
- "table_empty_state" in content,
- ])
+ has_empty_state = any(
+ [
+ ".length === 0" in content,
+ ".length == 0" in content,
+ "items.length" in content,
+ "empty_state" in content,
+ "No items" in content,
+ "No results" in content,
+ "No data" in content,
+ "table_empty_state" in content,
+ ]
+ )
if not has_empty_state:
self._add_violation(
@@ -995,10 +1053,14 @@ class ArchitectureValidator:
suggestion='Add No items found ',
)
- def _check_pagination_macro_usage(self, file_path: Path, content: str, lines: list[str]):
+ def _check_pagination_macro_usage(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""FE-001: Check for inline pagination that should use macro"""
# Check if already using the pagination macro
- uses_macro = any("from 'shared/macros/pagination.html'" in line for line in lines)
+ uses_macro = any(
+ "from 'shared/macros/pagination.html'" in line for line in lines
+ )
if uses_macro:
return
@@ -1010,9 +1072,9 @@ class ArchitectureValidator:
# Look for signs of inline pagination
pagination_indicators = [
('aria-label="Table navigation"', "Inline table navigation found"),
- ("previousPage()" , "Inline pagination controls found"),
- ("nextPage()" , "Inline pagination controls found"),
- ("goToPage(" , "Inline pagination controls found"),
+ ("previousPage()", "Inline pagination controls found"),
+ ("nextPage()", "Inline pagination controls found"),
+ ("goToPage(", "Inline pagination controls found"),
]
for i, line in enumerate(lines, 1):
@@ -1043,16 +1105,18 @@ class ArchitectureValidator:
return
# Pattern to find inline SVGs
- svg_pattern = re.compile(r']*viewBox[^>]*>.*? ', re.DOTALL | re.IGNORECASE)
+ svg_pattern = re.compile(
+ r"]*viewBox[^>]*>.*? ", re.DOTALL | re.IGNORECASE
+ )
# Find all SVG occurrences
for match in svg_pattern.finditer(content):
# Find line number
- line_num = content[:match.start()].count('\n') + 1
+ line_num = content[: match.start()].count("\n") + 1
# Skip if this is likely in a code example (inside or )
- context_before = content[max(0, match.start()-200):match.start()]
- if '',
+ suggestion=" ",
)
- def _check_alerts_macro_usage(self, file_path: Path, content: str, lines: list[str]):
+ def _check_alerts_macro_usage(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""FE-003: Check for inline loading/error states that should use alerts macro"""
# Check if already using the alerts macro
uses_macro = any("from 'shared/macros/alerts.html'" in line for line in lines)
@@ -1083,7 +1149,7 @@ class ArchitectureValidator:
# Loading state pattern: text-center py-12 with loading content
if 'x-show="loading"' in line or "x-show='loading'" in line:
# Check if next few lines have spinner pattern
- context_lines = "\n".join(lines[i-1:i+3])
+ context_lines = "\n".join(lines[i - 1 : i + 3])
if "text-center" in context_lines and "py-12" in context_lines:
self._add_violation(
rule_id="FE-003",
@@ -1111,7 +1177,9 @@ class ArchitectureValidator:
)
return
- def _check_modals_macro_usage(self, file_path: Path, content: str, lines: list[str]):
+ def _check_modals_macro_usage(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""FE-004: Check for inline modals that should use modals macro"""
# Check if already using the modals macro
uses_macro = any("from 'shared/macros/modals.html'" in line for line in lines)
@@ -1125,10 +1193,16 @@ class ArchitectureValidator:
# Look for modal patterns: fixed inset-0 with role="dialog" or modal backdrop
for i, line in enumerate(lines, 1):
- if "fixed inset-0" in line and ("z-50" in line or "z-30" in line or "z-40" in line):
+ if "fixed inset-0" in line and (
+ "z-50" in line or "z-30" in line or "z-40" in line
+ ):
# Check context for modal indicators
- context_lines = "\n".join(lines[max(0, i-1):min(len(lines), i+5)])
- if 'role="dialog"' in context_lines or "bg-opacity-50" in context_lines or "bg-black/50" in context_lines:
+ context_lines = "\n".join(lines[max(0, i - 1) : min(len(lines), i + 5)])
+ if (
+ 'role="dialog"' in context_lines
+ or "bg-opacity-50" in context_lines
+ or "bg-black/50" in context_lines
+ ):
self._add_violation(
rule_id="FE-004",
rule_name="Use modals macro",
@@ -1141,7 +1215,9 @@ class ArchitectureValidator:
)
return
- def _check_tables_macro_usage(self, file_path: Path, content: str, lines: list[str]):
+ def _check_tables_macro_usage(
+ self, file_path: Path, content: str, lines: list[str]
+ ):
"""FE-005: Check for inline table wrappers that should use tables macro"""
# Check if already using the tables macro
uses_macro = any("from 'shared/macros/tables.html'" in line for line in lines)
@@ -1155,9 +1231,13 @@ class ArchitectureValidator:
# Look for table wrapper pattern: overflow-hidden rounded-lg shadow-xs
for i, line in enumerate(lines, 1):
- if "overflow-hidden" in line and "rounded-lg" in line and "shadow-xs" in line:
+ if (
+ "overflow-hidden" in line
+ and "rounded-lg" in line
+ and "shadow-xs" in line
+ ):
# Check if there's a table nearby
- context_lines = "\n".join(lines[i-1:min(len(lines), i+10)])
+ context_lines = "\n".join(lines[i - 1 : min(len(lines), i + 10)])
if " that should use the number_stepper macro for
@@ -1269,13 +1366,23 @@ class ArchitectureValidator:
continue
# Skip if it looks like an ID field (check surrounding lines for context)
- context_lines = "\n".join(lines[max(0, i-3):min(len(lines), i+2)]).lower()
- if "user id" in context_lines or "placeholder" in context_lines and "id" in context_lines:
+ context_lines = "\n".join(
+ lines[max(0, i - 3) : min(len(lines), i + 2)]
+ ).lower()
+ if (
+ "user id" in context_lines
+ or "placeholder" in context_lines
+ and "id" in context_lines
+ ):
continue
# Skip if it's in a comment
stripped = line.strip()
- if stripped.startswith("{#") or stripped.startswith("