diff --git a/.performance-rules/_main.yaml b/.performance-rules/_main.yaml
new file mode 100644
index 00000000..abad34f7
--- /dev/null
+++ b/.performance-rules/_main.yaml
@@ -0,0 +1,66 @@
+# Performance Rules Configuration
+# ================================
+# Performance-focused validation rules for the codebase.
+# Run with: python scripts/validate_performance.py
+
+version: "1.0"
+project: "letzshop-product-import"
+
+description: |
+ Performance validation rules to detect inefficient patterns and ensure
+ optimal performance across the application.
+
+principles:
+ - name: "Minimize Database Queries"
+ description: "Reduce N+1 queries and optimize data fetching"
+ - name: "Efficient Data Structures"
+ description: "Use appropriate data structures for the task"
+ - name: "Lazy Loading"
+ description: "Load data only when needed"
+ - name: "Caching Strategy"
+ description: "Cache expensive computations and frequent queries"
+ - name: "Async I/O"
+ description: "Use async for I/O-bound operations"
+
+includes:
+ - database.yaml
+ - caching.yaml
+ - api.yaml
+ - async.yaml
+ - memory.yaml
+ - frontend.yaml
+
+severity_levels:
+ error:
+ description: "Critical performance issue that must be fixed"
+ exit_code: 1
+ warning:
+ description: "Performance concern that should be addressed"
+ exit_code: 0
+ info:
+ description: "Performance optimization recommendation"
+ exit_code: 0
+
+ignore:
+ files:
+ - "**/test_*.py"
+ - "**/tests/**"
+ - "**/*_test.py"
+ - "**/conftest.py"
+ - "**/migrations/**"
+ - "**/.venv/**"
+ - "**/venv/**"
+ - "**/node_modules/**"
+ - "**/site/**"
+ - "**/scripts/**"
+ - "**/__pycache__/**"
+ - "**/*.pyc"
+ patterns:
+ # Allow patterns in test files
+ - file: "**/tests/**"
+ pattern: ".*"
+ reason: "Test files may have different performance requirements"
+ # Allow patterns in scripts
+ - file: "**/scripts/**"
+ pattern: "\\.all\\(\\)"
+ reason: "Scripts may need to process all records"
diff --git a/.performance-rules/api.yaml b/.performance-rules/api.yaml
new file mode 100644
index 00000000..41e49327
--- /dev/null
+++ b/.performance-rules/api.yaml
@@ -0,0 +1,135 @@
+# API Performance Rules
+# =====================
+
+api_rules:
+ - id: "PERF-026"
+ name: "Pagination required for list endpoints"
+ severity: error
+ description: |
+ All list endpoints must support pagination.
+ Unbounded lists cause performance issues:
+ - Memory exhaustion
+ - Slow response times
+ - Database locks
+ file_pattern: "**/api/**/*.py"
+ anti_patterns:
+ - '@router\\.get\\([^)]*\\)\\s*\\n(?:(?!limit|skip|offset|page).)*def\\s+\\w+.*:\\s*\\n(?:(?!limit|skip|offset|page).)*return.*\\.all\\(\\)'
+ required_patterns:
+ - "limit|skip|offset|page"
+ example_bad: |
+ @router.get("/products")
+ def list_products(db: Session):
+ return db.query(Product).all()
+ example_good: |
+ @router.get("/products")
+ def list_products(
+ skip: int = 0,
+ limit: int = Query(default=20, le=100),
+ db: Session = Depends(get_db)
+ ):
+ return db.query(Product).offset(skip).limit(limit).all()
+
+ - id: "PERF-027"
+ name: "Reasonable default page sizes"
+ severity: warning
+ description: |
+ Default page sizes should be reasonable:
+ - Default: 20-50 items
+ - Maximum: 100-200 items
+
+ Very large page sizes negate pagination benefits.
+ file_pattern: "**/api/**/*.py"
+ anti_patterns:
+ - 'limit.*=.*Query\\([^)]*default\\s*=\\s*[5-9]\\d{2,}'
+ - 'limit.*=.*Query\\([^)]*le\\s*=\\s*[1-9]\\d{3,}'
+ example_bad: |
+ limit: int = Query(default=500, le=10000)
+ example_good: |
+ limit: int = Query(default=20, ge=1, le=100)
+
+ - id: "PERF-028"
+ name: "Response compression"
+ severity: info
+ description: |
+ Enable response compression for large responses:
+ - GZip or Brotli
+ - Significant bandwidth savings
+ - Faster load times
+ file_pattern: "**/main.py|**/app.py"
+ suggested_patterns:
+ - "GZipMiddleware|BrotliMiddleware|compress"
+
+ - id: "PERF-029"
+ name: "Efficient serialization"
+ severity: info
+ description: |
+ Use Pydantic's response_model for efficient serialization.
+ Avoid manual dict conversion.
+ file_pattern: "**/api/**/*.py"
+ anti_patterns:
+ - 'return\\s+\\{[^}]*for\\s+\\w+\\s+in'
+ - 'return\\s+\\[\\{.*for.*in.*\\]'
+ suggested_patterns:
+ - "response_model"
+
+ - id: "PERF-030"
+ name: "Avoid redundant queries in response"
+ severity: warning
+ description: |
+ Don't trigger lazy-loaded relationships during serialization.
+ Use eager loading or carefully control serialization.
+ file_pattern: "**/api/**/*.py"
+
+ - id: "PERF-031"
+ name: "Streaming for large responses"
+ severity: info
+ description: |
+ Use streaming responses for large data:
+ - File downloads
+ - Large exports (CSV, JSON)
+ - Real-time data feeds
+ file_pattern: "**/api/**/*.py"
+ suggested_patterns:
+ - "StreamingResponse|yield|generator"
+
+ - id: "PERF-032"
+ name: "Conditional requests support"
+ severity: info
+ description: |
+ Support conditional requests to reduce bandwidth:
+ - ETag validation
+ - If-None-Match handling
+ - 304 Not Modified responses
+ file_pattern: "**/api/**/*.py"
+ suggested_patterns:
+ - "ETag|If-None-Match|304"
+
+ - id: "PERF-033"
+ name: "Field selection support"
+ severity: info
+ description: |
+ Allow clients to request only needed fields.
+ Reduces response size and serialization cost.
+ file_pattern: "**/api/**/*.py"
+ suggested_patterns:
+ - "fields|include|exclude|sparse"
+
+ - id: "PERF-034"
+ name: "Avoid deep nesting in responses"
+ severity: info
+ description: |
+ Deeply nested responses are slow to serialize.
+ Consider flattening or using links instead.
+ file_pattern: "**/api/**/*.py"
+
+ - id: "PERF-035"
+ name: "Endpoint response time monitoring"
+ severity: info
+ description: |
+ Monitor API response times:
+ - Set SLA targets
+ - Alert on degradation
+ - Track percentiles (p50, p95, p99)
+ file_pattern: "**/main.py|**/middleware*.py"
+ suggested_patterns:
+ - "prometheus|metrics|timing|latency"
diff --git a/.performance-rules/async.yaml b/.performance-rules/async.yaml
new file mode 100644
index 00000000..26e79a8e
--- /dev/null
+++ b/.performance-rules/async.yaml
@@ -0,0 +1,142 @@
+# Async & Concurrency Performance Rules
+# =====================================
+
+async_rules:
+ - id: "PERF-036"
+ name: "Async for I/O operations"
+ severity: info
+ description: |
+ Use async for I/O-bound operations:
+ - Database queries (with async driver)
+ - External API calls
+ - File operations
+ - Network requests
+ file_pattern: "**/api/**/*.py|**/service*.py"
+ suggested_patterns:
+ - "async def|await|asyncio"
+
+ - id: "PERF-037"
+ name: "Parallel independent operations"
+ severity: warning
+ description: |
+ Multiple independent async operations should run in parallel.
+ Use asyncio.gather() instead of sequential awaits.
+ file_pattern: "**/*.py"
+ anti_patterns:
+ - 'await\\s+\\w+\\([^)]*\\)\\s*\\n\\s*await\\s+\\w+\\([^)]*\\)\\s*\\n\\s*await\\s+\\w+\\('
+ suggested_patterns:
+ - "asyncio\\.gather|asyncio\\.create_task"
+ example_bad: |
+ user = await get_user(user_id)
+ orders = await get_orders(user_id)
+ preferences = await get_preferences(user_id)
+ example_good: |
+ user, orders, preferences = await asyncio.gather(
+ get_user(user_id),
+ get_orders(user_id),
+ get_preferences(user_id)
+ )
+
+ - id: "PERF-038"
+ name: "Background tasks for slow operations"
+ severity: warning
+ description: |
+ Operations taking > 500ms should run in background:
+ - Email sending
+ - Report generation
+ - External API syncs
+ - File processing
+ file_pattern: "**/api/**/*.py"
+ suggested_patterns:
+ - "BackgroundTasks|background_task|celery|rq|dramatiq"
+
+ - id: "PERF-039"
+ name: "Connection pooling for HTTP clients"
+ severity: warning
+ description: |
+ HTTP clients should reuse connections.
+ Create client once, not per request.
+ file_pattern: "**/*client*.py|**/service*.py"
+ anti_patterns:
+ - 'def\\s+\\w+\\([^)]*\\):\\s*\\n[^}]*requests\\.get\\('
+ - 'httpx\\.get\\('
+ - 'aiohttp\\.request\\('
+ suggested_patterns:
+ - "httpx\\.AsyncClient|aiohttp\\.ClientSession|requests\\.Session"
+ example_bad: |
+ def fetch_data(url):
+ response = requests.get(url) # New connection each time
+ example_good: |
+ # Use a session (connection pool)
+ async with httpx.AsyncClient() as client:
+ response = await client.get(url)
+
+ - id: "PERF-040"
+ name: "Timeout configuration"
+ severity: error
+ description: |
+ All external calls must have timeouts.
+ Without timeouts, requests can hang indefinitely.
+ file_pattern: "**/*client*.py|**/service*.py"
+ context_patterns:
+ - "requests|httpx|aiohttp|urllib"
+ required_patterns:
+ - "timeout"
+ example_bad: |
+ response = requests.get(url)
+ example_good: |
+ response = requests.get(url, timeout=30)
+
+ - id: "PERF-041"
+ name: "Connection pool limits"
+ severity: info
+ description: |
+ Configure appropriate connection pool limits:
+ - max_connections: Total connections
+ - max_keepalive_connections: Idle connections
+ - keepalive_expiry: Time before closing idle
+ file_pattern: "**/*client*.py"
+ suggested_patterns:
+ - "max_connections|pool_connections|pool_maxsize"
+
+ - id: "PERF-042"
+ name: "Retry with backoff"
+ severity: info
+ description: |
+ External calls should retry with exponential backoff.
+ Prevents cascade failures and respects rate limits.
+ file_pattern: "**/*client*.py|**/service*.py"
+ suggested_patterns:
+ - "retry|backoff|tenacity|Retry"
+
+ - id: "PERF-043"
+ name: "Circuit breaker pattern"
+ severity: info
+ description: |
+ Use circuit breaker for unreliable external services.
+ Prevents repeated failures from slowing down the system.
+ file_pattern: "**/*client*.py"
+ suggested_patterns:
+ - "circuit_breaker|CircuitBreaker|pybreaker"
+
+ - id: "PERF-044"
+ name: "Task queues for heavy processing"
+ severity: info
+ description: |
+ Heavy processing should use task queues:
+ - Celery
+ - RQ (Redis Queue)
+ - Dramatiq
+ - Huey
+ file_pattern: "**/tasks/**/*.py"
+ suggested_patterns:
+ - "celery|rq|dramatiq|huey|@task"
+
+ - id: "PERF-045"
+ name: "Worker pool sizing"
+ severity: info
+ description: |
+ Size worker pools appropriately:
+ - CPU-bound: Number of cores
+ - I/O-bound: Higher multiplier (2-4x cores)
+ - Memory-constrained: Based on available RAM
diff --git a/.performance-rules/caching.yaml b/.performance-rules/caching.yaml
new file mode 100644
index 00000000..603c905e
--- /dev/null
+++ b/.performance-rules/caching.yaml
@@ -0,0 +1,125 @@
+# Caching Performance Rules
+# =========================
+
+caching_rules:
+ - id: "PERF-016"
+ name: "Cache expensive computations"
+ severity: info
+ description: |
+ Computationally expensive operations should be cached:
+ - Complex aggregations
+ - External API results
+ - Template rendering
+ - Data transformations
+ file_pattern: "**/service*.py"
+ suggested_patterns:
+ - "@cache|@lru_cache|@cached|redis|memcache"
+
+ - id: "PERF-017"
+ name: "Cache key includes tenant context"
+ severity: warning
+ description: |
+ Multi-tenant cache keys must include vendor_id.
+ Otherwise, cached data may leak between tenants.
+ file_pattern: "**/*cache*.py|**/service*.py"
+ context_patterns:
+ - "cache|@cached|redis"
+ required_patterns:
+ - "vendor_id|tenant"
+ example_bad: |
+ @cache.memoize()
+ def get_products():
+ return db.query(Product).all()
+ example_good: |
+ @cache.memoize()
+ def get_products(vendor_id: int):
+ return db.query(Product).filter_by(vendor_id=vendor_id).all()
+
+ - id: "PERF-018"
+ name: "Cache TTL configuration"
+ severity: info
+ description: |
+ Cache entries should have appropriate TTL:
+ - Short TTL (1-5 min): Frequently changing data
+ - Medium TTL (5-60 min): Semi-static data
+ - Long TTL (1+ hour): Reference data
+ file_pattern: "**/*cache*.py"
+ suggested_patterns:
+ - "ttl|expire|timeout"
+
+ - id: "PERF-019"
+ name: "Cache invalidation strategy"
+ severity: warning
+ description: |
+ Define cache invalidation strategy:
+ - Time-based (TTL)
+ - Event-based (on data change)
+ - Manual (admin action)
+
+ Without invalidation, stale data may be served.
+ file_pattern: "**/*cache*.py|**/service*.py"
+ suggested_patterns:
+ - "invalidate|delete|clear|purge"
+
+ - id: "PERF-020"
+ name: "Response caching headers"
+ severity: info
+ description: |
+ API responses can use HTTP caching headers:
+ - Cache-Control for browser/CDN caching
+ - ETag for conditional requests
+ - Last-Modified for validation
+ file_pattern: "**/api/**/*.py"
+ suggested_patterns:
+ - "Cache-Control|ETag|Last-Modified"
+
+ - id: "PERF-021"
+ name: "Query result caching"
+ severity: info
+ description: |
+ Frequently accessed, rarely changed data should be cached:
+ - User preferences
+ - Configuration settings
+ - Static reference data
+ file_pattern: "**/service*.py"
+
+ - id: "PERF-022"
+ name: "Session-level caching"
+ severity: info
+ description: |
+ Use SQLAlchemy's identity map for request-scoped caching.
+ Avoid re-fetching the same entity within a request.
+ file_pattern: "**/service*.py"
+
+ - id: "PERF-023"
+ name: "Distributed cache for scalability"
+ severity: info
+ description: |
+ For multi-instance deployments, use distributed cache:
+ - Redis
+ - Memcached
+ - Database-backed cache
+
+ Local caches don't work across instances.
+ file_pattern: "**/config*.py"
+ suggested_patterns:
+ - "redis|memcache|CACHE_TYPE"
+
+ - id: "PERF-024"
+ name: "Cache warming strategy"
+ severity: info
+ description: |
+ Pre-warm cache for predictable high-traffic patterns:
+ - On application startup
+ - Before marketing campaigns
+ - After cache flush
+
+ - id: "PERF-025"
+ name: "Monitor cache hit rates"
+ severity: info
+ description: |
+ Track cache performance:
+ - Hit rate (should be > 80%)
+ - Miss penalty (time saved)
+ - Memory usage
+ - Eviction rate
diff --git a/.performance-rules/database.yaml b/.performance-rules/database.yaml
new file mode 100644
index 00000000..fc71acce
--- /dev/null
+++ b/.performance-rules/database.yaml
@@ -0,0 +1,223 @@
+# Database Performance Rules
+# ==========================
+
+database_rules:
+ - id: "PERF-001"
+ name: "N+1 query detection"
+ severity: warning
+ description: |
+ Accessing relationships in loops causes N+1 queries.
+ For each item in a list, a separate query is executed.
+
+ Solutions:
+ - joinedload(): Eager load with JOIN
+ - selectinload(): Eager load with IN clause
+ - subqueryload(): Eager load with subquery
+ file_pattern: "**/service*.py|**/api/**/*.py"
+ anti_patterns:
+ - 'for\s+\w+\s+in\s+\w+\.all\(\):\s*\n[^}]*\.\w+\.\w+'
+ suggested_patterns:
+ - "joinedload|selectinload|subqueryload"
+ example_bad: |
+ orders = db.query(Order).all()
+ for order in orders:
+ customer_name = order.customer.name # N+1 query!
+ example_good: |
+ orders = db.query(Order).options(
+ joinedload(Order.customer)
+ ).all()
+ for order in orders:
+ customer_name = order.customer.name # Already loaded
+
+ - id: "PERF-002"
+ name: "Eager loading for known relationships"
+ severity: info
+ description: |
+ When you always need related data, use eager loading
+ to reduce the number of database round trips.
+ file_pattern: "**/service*.py"
+ suggested_patterns:
+ - "joinedload|selectinload|subqueryload"
+
+ - id: "PERF-003"
+ name: "Query result limiting"
+ severity: warning
+ description: |
+ All list queries should have pagination or limits.
+ Unbounded queries can cause memory issues and slow responses.
+ file_pattern: "**/service*.py|**/api/**/*.py"
+ anti_patterns:
+ - '\\.all\\(\\)(?![^\\n]*limit|[^\\n]*\\[:)'
+ exclude_patterns:
+ - "# noqa: PERF-003"
+ - "# bounded query"
+ - ".filter("
+ suggested_patterns:
+ - "limit|offset|skip|paginate"
+ example_bad: |
+ all_products = db.query(Product).all()
+ example_good: |
+ products = db.query(Product).limit(100).all()
+ # Or with pagination
+ products = db.query(Product).offset(skip).limit(limit).all()
+
+ - id: "PERF-004"
+ name: "Index usage for filtered columns"
+ severity: info
+ description: |
+ Columns frequently used in WHERE clauses should have indexes:
+ - Foreign keys (vendor_id, customer_id)
+ - Status fields
+ - Date fields used for filtering
+ - Boolean flags used for filtering
+ file_pattern: "**/models/database/*.py"
+ suggested_patterns:
+ - "index=True|Index\\("
+
+ - id: "PERF-005"
+ name: "Select only needed columns"
+ severity: info
+ description: |
+ For large tables, select only the columns you need.
+ Use .with_entities() or load_only() to reduce data transfer.
+ file_pattern: "**/service*.py"
+ suggested_patterns:
+ - "with_entities|load_only|defer"
+ example_good: |
+ # Only load id and name columns
+ products = db.query(Product).options(
+ load_only(Product.id, Product.name)
+ ).all()
+
+ - id: "PERF-006"
+ name: "Bulk operations for multiple records"
+ severity: warning
+ description: |
+ Use bulk operations instead of individual operations in loops:
+ - bulk_insert_mappings() for inserts
+ - bulk_update_mappings() for updates
+ - add_all() for ORM inserts
+ file_pattern: "**/service*.py"
+ anti_patterns:
+ - 'for\\s+\\w+\\s+in\\s+\\w+:\\s*\\n[^}]*db\\.add\\s*\\('
+ - 'for\\s+\\w+\\s+in\\s+\\w+:\\s*\\n[^}]*\\.save\\s*\\('
+ suggested_patterns:
+ - "bulk_insert_mappings|bulk_update_mappings|add_all"
+ example_bad: |
+ for item in items:
+ product = Product(**item)
+ db.add(product)
+ example_good: |
+ products = [Product(**item) for item in items]
+ db.add_all(products)
+
+ - id: "PERF-007"
+ name: "Connection pool configuration"
+ severity: info
+ description: |
+ Configure database connection pool for optimal performance:
+ - pool_size: Number of persistent connections
+ - max_overflow: Additional connections allowed
+ - pool_pre_ping: Check connection health
+ - pool_recycle: Recycle connections periodically
+ file_pattern: "**/database.py|**/config*.py"
+ suggested_patterns:
+ - "pool_size|pool_pre_ping|pool_recycle|max_overflow"
+
+ - id: "PERF-008"
+ name: "Use EXISTS for existence checks"
+ severity: info
+ description: |
+ Use EXISTS or .first() is not None instead of count() > 0.
+ EXISTS stops at first match, count() scans all matches.
+ file_pattern: "**/service*.py"
+ anti_patterns:
+ - '\\.count\\(\\)\\s*>\\s*0'
+ - '\\.count\\(\\)\\s*>=\\s*1'
+ - '\\.count\\(\\)\\s*!=\\s*0'
+ suggested_patterns:
+ - "exists\\(\\)|scalar\\(exists"
+ example_bad: |
+ if db.query(Order).filter_by(customer_id=id).count() > 0:
+ example_good: |
+ exists_query = db.query(exists().where(Order.customer_id == id))
+ if db.scalar(exists_query):
+
+ - id: "PERF-009"
+ name: "Batch updates instead of loops"
+ severity: warning
+ description: |
+ Use .update() with filters instead of updating in a loop.
+ One UPDATE statement is faster than N individual updates.
+ file_pattern: "**/service*.py"
+ anti_patterns:
+ - 'for\\s+\\w+\\s+in\\s+\\w+:\\s*\\n[^}]*\\w+\\.\\w+\\s*='
+ suggested_patterns:
+ - "\\.update\\(\\{"
+ example_bad: |
+ for product in products:
+ product.is_active = False
+ db.add(product)
+ example_good: |
+ db.query(Product).filter(
+ Product.id.in_(product_ids)
+ ).update({"is_active": False}, synchronize_session=False)
+
+ - id: "PERF-010"
+ name: "Avoid SELECT * patterns"
+ severity: info
+ description: |
+ When you only need specific columns, don't load entire rows.
+ This reduces memory usage and network transfer.
+ file_pattern: "**/service*.py"
+
+ - id: "PERF-011"
+ name: "Use appropriate join strategies"
+ severity: info
+ description: |
+ Choose the right join strategy:
+ - joinedload: Few related items, always needed
+ - selectinload: Many related items, always needed
+ - subqueryload: Complex queries, many related items
+ - lazyload: Rarely accessed relationships
+ file_pattern: "**/service*.py"
+
+ - id: "PERF-012"
+ name: "Transaction scope optimization"
+ severity: warning
+ description: |
+ Keep transactions short and focused:
+ - Don't hold transactions during I/O
+ - Commit after bulk operations
+ - Use read-only transactions when possible
+ file_pattern: "**/service*.py"
+
+ - id: "PERF-013"
+ name: "Query result caching"
+ severity: info
+ description: |
+ Consider caching for:
+ - Frequently accessed, rarely changed data
+ - Configuration tables
+ - Reference data (categories, statuses)
+ file_pattern: "**/service*.py"
+ suggested_patterns:
+ - "@cache|@lru_cache|redis|memcache"
+
+ - id: "PERF-014"
+ name: "Composite indexes for multi-column filters"
+ severity: info
+ description: |
+ Queries filtering on multiple columns benefit from composite indexes.
+ Order columns by selectivity (most selective first).
+ file_pattern: "**/models/database/*.py"
+ suggested_patterns:
+ - "Index\\([^)]*,[^)]*\\)"
+
+ - id: "PERF-015"
+ name: "Avoid correlated subqueries"
+ severity: info
+ description: |
+ Correlated subqueries execute once per row.
+ Use JOINs or window functions instead when possible.
+ file_pattern: "**/service*.py"
diff --git a/.performance-rules/frontend.yaml b/.performance-rules/frontend.yaml
new file mode 100644
index 00000000..0d9845b7
--- /dev/null
+++ b/.performance-rules/frontend.yaml
@@ -0,0 +1,177 @@
+# Frontend Performance Rules
+# ==========================
+
+frontend_rules:
+ - id: "PERF-056"
+ name: "Debounce search inputs"
+ severity: warning
+ description: |
+ Search inputs should debounce API calls.
+ Recommended: 300-500ms delay.
+
+ Prevents excessive API calls while user is typing.
+ file_pattern: "**/*.js"
+ context_patterns:
+ - "search|filter|query"
+ anti_patterns:
+ - '@input=".*search.*fetch'
+ - '@keyup=".*search.*fetch'
+ suggested_patterns:
+ - "debounce|setTimeout.*search|\\$watch.*search"
+ example_bad: |
+
+ example_good: |
+
+ // With: debouncedSearch = debounce(searchProducts, 300)
+
+ - id: "PERF-057"
+ name: "Lazy load off-screen content"
+ severity: info
+ description: |
+ Defer loading of off-screen content:
+ - Modals
+ - Tabs (inactive)
+ - Below-the-fold content
+ - Images
+ file_pattern: "**/*.html"
+ suggested_patterns:
+ - 'loading="lazy"|x-intersect|x-show|x-if'
+
+ - id: "PERF-058"
+ name: "Image optimization"
+ severity: warning
+ description: |
+ Images should be optimized:
+ - Use appropriate formats (WebP, AVIF)
+ - Serve responsive sizes
+ - Lazy load off-screen images
+ - Use CDN for static assets
+ file_pattern: "**/*.html"
+ required_patterns:
+ - 'loading="lazy"|srcset|x-intersect'
+ example_good: |
+
+
+ - id: "PERF-059"
+ name: "Minimize Alpine.js watchers"
+ severity: info
+ description: |
+ Excessive $watch calls impact performance.
+ Use computed properties or event handlers instead.
+ file_pattern: "**/*.js"
+ anti_patterns:
+ - '\\$watch\\([^)]+\\).*\\$watch\\([^)]+\\).*\\$watch\\('
+
+ - id: "PERF-060"
+ name: "Virtual scrolling for long lists"
+ severity: info
+ description: |
+ Lists with 100+ items should use virtual scrolling.
+ Only render visible items in the viewport.
+ file_pattern: "**/*.html|**/*.js"
+ suggested_patterns:
+ - "virtual-scroll|x-intersect|IntersectionObserver"
+
+ - id: "PERF-061"
+ name: "Minimize bundle size"
+ severity: info
+ description: |
+ Reduce JavaScript bundle size:
+ - Import only needed modules
+ - Use tree-shaking
+ - Split code by route
+ file_pattern: "**/*.js"
+
+ - id: "PERF-062"
+ name: "Reasonable polling intervals"
+ severity: warning
+ description: |
+ Polling should be >= 10 seconds for non-critical updates.
+ Lower intervals waste bandwidth and server resources.
+ file_pattern: "**/*.js"
+ anti_patterns:
+ - 'setInterval\\s*\\([^,]+,\\s*[1-9]\\d{0,3}\\s*\\)'
+ exclude_patterns:
+ - "# real-time required"
+ example_bad: |
+ setInterval(fetchUpdates, 1000); // Every second
+ example_good: |
+ setInterval(fetchUpdates, 30000); // Every 30 seconds
+
+ - id: "PERF-063"
+ name: "CSS containment"
+ severity: info
+ description: |
+ Use CSS containment for complex layouts.
+ Limits rendering scope for better performance.
+ file_pattern: "**/*.css|**/*.html"
+ suggested_patterns:
+ - "contain:|content-visibility"
+
+ - id: "PERF-064"
+ name: "Avoid layout thrashing"
+ severity: warning
+ description: |
+ Don't interleave DOM reads and writes.
+ Batch reads first, then writes.
+ file_pattern: "**/*.js"
+ anti_patterns:
+ - 'offsetHeight.*style\\.|style\\..*offsetHeight'
+
+ - id: "PERF-065"
+ name: "Use CSS animations over JavaScript"
+ severity: info
+ description: |
+ CSS animations are hardware-accelerated.
+ Use CSS for simple animations, JS for complex ones.
+ file_pattern: "**/*.js"
+ suggested_patterns:
+ - "transition|animation|transform"
+
+ - id: "PERF-066"
+ name: "Preload critical resources"
+ severity: info
+ description: |
+ Preload critical CSS, fonts, and above-the-fold images.
+ Reduces perceived load time.
+ file_pattern: "**/*.html"
+ suggested_patterns:
+ - 'rel="preload"|rel="prefetch"|rel="preconnect"'
+
+ - id: "PERF-067"
+ name: "Defer non-critical JavaScript"
+ severity: info
+ description: |
+ Non-critical JavaScript should be deferred.
+ Allows page rendering to complete first.
+ file_pattern: "**/*.html"
+ suggested_patterns:
+ - 'defer|async'
+
+ - id: "PERF-068"
+ name: "Minimize DOM nodes"
+ severity: info
+ description: |
+ Excessive DOM nodes slow rendering.
+ Target: < 1500 nodes, depth < 32, children < 60
+ file_pattern: "**/*.html"
+
+ - id: "PERF-069"
+ name: "Efficient event handlers"
+ severity: info
+ description: |
+ Use event delegation for repeated elements.
+ Add listener to parent, not each child.
+ file_pattern: "**/*.js"
+ suggested_patterns:
+ - "@click.delegate|event.target.closest"
+
+ - id: "PERF-070"
+ name: "Cache DOM queries"
+ severity: info
+ description: |
+ Store DOM element references instead of re-querying.
+ Each querySelector has performance cost.
+ file_pattern: "**/*.js"
+ anti_patterns:
+ - 'document\\.querySelector\\([^)]+\\).*document\\.querySelector\\('
diff --git a/.performance-rules/memory.yaml b/.performance-rules/memory.yaml
new file mode 100644
index 00000000..8a22748b
--- /dev/null
+++ b/.performance-rules/memory.yaml
@@ -0,0 +1,156 @@
+# Memory Management Performance Rules
+# ====================================
+
+memory_rules:
+ - id: "PERF-046"
+ name: "Generators for large datasets"
+ severity: warning
+ description: |
+ Use generators/iterators for processing large datasets.
+ Avoids loading everything into memory at once.
+ file_pattern: "**/service*.py"
+ anti_patterns:
+ - '\\.all\\(\\).*for\\s+\\w+\\s+in'
+ suggested_patterns:
+ - "yield|yield_per|iter"
+ example_bad: |
+ products = db.query(Product).all() # Loads all into memory
+ for product in products:
+ process(product)
+ example_good: |
+ for product in db.query(Product).yield_per(100):
+ process(product)
+
+ - id: "PERF-047"
+ name: "Stream large file uploads"
+ severity: warning
+ description: |
+ Large files should be streamed to disk, not held in memory.
+ Use SpooledTemporaryFile or direct disk writing.
+ file_pattern: "**/upload*.py|**/attachment*.py"
+ suggested_patterns:
+ - "SpooledTemporaryFile|chunk|stream"
+ example_bad: |
+ content = await file.read() # Entire file in memory
+ with open(path, 'wb') as f:
+ f.write(content)
+ example_good: |
+ with open(path, 'wb') as f:
+ while chunk := await file.read(8192):
+ f.write(chunk)
+
+ - id: "PERF-048"
+ name: "Chunked processing for imports"
+ severity: warning
+ description: |
+ Bulk imports should process in chunks:
+ - Read in batches
+ - Commit in batches
+ - Report progress periodically
+ file_pattern: "**/import*.py|**/csv*.py"
+ required_patterns:
+ - "chunk|batch|yield"
+ example_bad: |
+ rows = list(csv_reader) # All rows in memory
+ for row in rows:
+ process(row)
+ example_good: |
+ def process_in_chunks(reader, chunk_size=1000):
+ chunk = []
+ for row in reader:
+ chunk.append(row)
+ if len(chunk) >= chunk_size:
+ yield chunk
+ chunk = []
+ if chunk:
+ yield chunk
+
+ - id: "PERF-049"
+ name: "Context managers for resources"
+ severity: error
+ description: |
+ Use context managers for file operations.
+ Ensures resources are properly released.
+ file_pattern: "**/*.py"
+ anti_patterns:
+ - 'f\\s*=\\s*open\\s*\\([^)]+\\)(?!\\s*#.*context)'
+ - '^(?!.*with).*open\\s*\\([^)]+\\)\\s*$'
+ exclude_patterns:
+ - "# noqa: PERF-049"
+ - "with open"
+ example_bad: |
+ f = open('file.txt')
+ content = f.read()
+ f.close() # May not run if exception
+ example_good: |
+ with open('file.txt') as f:
+ content = f.read()
+
+ - id: "PERF-050"
+ name: "Limit in-memory collections"
+ severity: info
+ description: |
+ Avoid building large lists in memory.
+ Use generators, itertools, or database pagination.
+ file_pattern: "**/service*.py"
+ anti_patterns:
+ - '\\[.*for.*in.*\\](?!.*[:10])'
+
+ - id: "PERF-051"
+ name: "String concatenation efficiency"
+ severity: info
+ description: |
+ For many string concatenations, use join() or StringIO.
+ Repeated += creates many intermediate strings.
+ file_pattern: "**/*.py"
+ anti_patterns:
+ - 'for.*:\\s*\\n[^}]*\\+='
+ suggested_patterns:
+ - "\\.join\\(|StringIO"
+ example_bad: |
+ result = ""
+ for item in items:
+ result += str(item)
+ example_good: |
+ result = "".join(str(item) for item in items)
+
+ - id: "PERF-052"
+ name: "Efficient data structures"
+ severity: info
+ description: |
+ Choose appropriate data structures:
+ - set for membership testing
+ - dict for key-value lookup
+ - deque for queue operations
+ - defaultdict for grouping
+ file_pattern: "**/*.py"
+
+ - id: "PERF-053"
+ name: "Object pooling for expensive objects"
+ severity: info
+ description: |
+ Reuse expensive-to-create objects:
+ - Database connections
+ - HTTP clients
+ - Template engines
+ file_pattern: "**/*.py"
+
+ - id: "PERF-054"
+ name: "Weak references for caches"
+ severity: info
+ description: |
+ Use weak references for large object caches.
+ Allows garbage collection when memory is needed.
+ file_pattern: "**/*cache*.py"
+ suggested_patterns:
+ - "WeakValueDictionary|WeakKeyDictionary|weakref"
+
+ - id: "PERF-055"
+ name: "Slots for frequently instantiated classes"
+ severity: info
+ description: |
+ Use __slots__ for classes with many instances.
+ Reduces memory footprint per instance.
+ file_pattern: "**/models/**/*.py"
+ suggested_patterns:
+ - "__slots__"
diff --git a/.security-rules/_main.yaml b/.security-rules/_main.yaml
new file mode 100644
index 00000000..d25b0211
--- /dev/null
+++ b/.security-rules/_main.yaml
@@ -0,0 +1,66 @@
+# Security Rules Configuration
+# ============================
+# Security-focused validation rules for the codebase.
+# Run with: python scripts/validate_security.py
+
+version: "1.0"
+project: "letzshop-product-import"
+
+description: |
+ Security validation rules to detect common vulnerabilities and ensure
+ secure coding practices across the application.
+
+principles:
+ - name: "Defense in Depth"
+ description: "Multiple layers of security controls"
+ - name: "Least Privilege"
+ description: "Minimal access rights for users and processes"
+ - name: "Secure by Default"
+ description: "Secure configurations out of the box"
+ - name: "Fail Securely"
+ description: "Errors should not compromise security"
+ - name: "Input Validation"
+ description: "Never trust user input"
+
+includes:
+ - authentication.yaml
+ - injection.yaml
+ - data_protection.yaml
+ - api_security.yaml
+ - cryptography.yaml
+ - audit.yaml
+
+severity_levels:
+ error:
+ description: "Critical security vulnerability that must be fixed"
+ exit_code: 1
+ warning:
+ description: "Security concern that should be addressed"
+ exit_code: 0
+ info:
+ description: "Security best practice recommendation"
+ exit_code: 0
+
+ignore:
+ files:
+ - "**/test_*.py"
+ - "**/tests/**"
+ - "**/*_test.py"
+ - "**/conftest.py"
+ - "**/migrations/**"
+ - "**/.venv/**"
+ - "**/venv/**"
+ - "**/node_modules/**"
+ - "**/site/**"
+ - "**/scripts/**"
+ - "**/__pycache__/**"
+ - "**/*.pyc"
+ patterns:
+ # Allow test credentials in test files
+ - file: "**/tests/**"
+ pattern: "password.*=.*test"
+ reason: "Test fixtures use dummy credentials"
+ # Allow example patterns in documentation
+ - file: "**/docs/**"
+ pattern: ".*"
+ reason: "Documentation examples"
diff --git a/.security-rules/api_security.yaml b/.security-rules/api_security.yaml
new file mode 100644
index 00000000..c5ceb95a
--- /dev/null
+++ b/.security-rules/api_security.yaml
@@ -0,0 +1,66 @@
+# API Security Rules
+# ==================
+
+api_security_rules:
+ - id: SEC-031
+ name: CORS origin validation
+ severity: error
+ description: >
+ CORS must not allow all origins in production.
+ Specify allowed origins explicitly.
+
+ - id: SEC-032
+ name: Rate limiting on sensitive endpoints
+ severity: warning
+ description: >
+ Auth, password reset, and payment endpoints need rate limiting.
+
+ - id: SEC-033
+ name: Security headers
+ severity: warning
+ description: >
+ Configure security headers like X-Content-Type-Options,
+ X-Frame-Options, Content-Security-Policy.
+
+ - id: SEC-034
+ name: HTTPS enforcement
+ severity: error
+ description: >
+ External URLs must use HTTPS.
+ HTTP is only acceptable for localhost.
+
+ - id: SEC-035
+ name: Request size limits
+ severity: warning
+ description: >
+ Limit request body size to prevent DoS attacks.
+
+ - id: SEC-036
+ name: Input validation with Pydantic
+ severity: warning
+ description: >
+ All API inputs should be validated using Pydantic models.
+
+ - id: SEC-037
+ name: API versioning
+ severity: info
+ description: >
+ APIs should be versioned for security update isolation.
+
+ - id: SEC-038
+ name: Method restrictions
+ severity: warning
+ description: >
+ Endpoints should only allow necessary HTTP methods.
+
+ - id: SEC-039
+ name: Authentication bypass prevention
+ severity: error
+ description: >
+ Ensure authentication cannot be bypassed.
+
+ - id: SEC-040
+ name: Timeout configuration
+ severity: warning
+ description: >
+ All external calls must have timeouts configured.
diff --git a/.security-rules/audit.yaml b/.security-rules/audit.yaml
new file mode 100644
index 00000000..6df9ffa0
--- /dev/null
+++ b/.security-rules/audit.yaml
@@ -0,0 +1,131 @@
+# Audit & Logging Rules
+# =====================
+
+audit_rules:
+ - id: "SEC-051"
+ name: "Authentication event logging"
+ severity: warning
+ description: |
+ Log authentication events:
+ - Successful logins (with user ID, IP)
+ - Failed login attempts (with IP, reason)
+ - Logouts
+ - Password changes
+ - Password reset requests
+ file_pattern: "**/auth*.py|**/login*.py"
+ required_patterns:
+ - "log"
+ suggested_patterns:
+ - 'logger\.(info|warning).*login|auth|password'
+
+ - id: "SEC-052"
+ name: "Admin action audit trail"
+ severity: warning
+ description: |
+ All admin operations should be logged with:
+ - Admin user ID
+ - Action performed
+ - Target resource
+ - Timestamp
+ - IP address
+ file_pattern: "**/admin/**/*.py"
+ required_patterns:
+ - "log"
+ suggested_patterns:
+ - "logger|audit"
+
+ - id: "SEC-053"
+ name: "Data modification logging"
+ severity: info
+ description: |
+ Log create/update/delete on sensitive data:
+ - User accounts
+ - Roles and permissions
+ - Financial transactions
+ - Configuration changes
+ file_pattern: "**/service*.py"
+
+ - id: "SEC-054"
+ name: "Security event logging"
+ severity: warning
+ description: |
+ Log security-relevant events:
+ - Authorization failures
+ - Input validation failures
+ - Rate limit triggers
+ - Suspicious activity patterns
+ file_pattern: "**/*.py"
+ context_patterns:
+ - "unauthorized|forbidden|rate_limit|suspicious"
+ suggested_patterns:
+ - "logger\\.warning|logger\\.error"
+
+ - id: "SEC-055"
+ name: "Log injection prevention"
+ severity: warning
+ description: |
+ Sanitize user input before logging.
+ Newlines and control characters can corrupt logs.
+ file_pattern: "**/*.py"
+ anti_patterns:
+ - 'logger\.[a-z]+\(.*request\..*\)'
+ suggested_patterns:
+ - "sanitize|escape|repr\\("
+ example_bad: |
+ logger.info(f"User search: {request.query}")
+ example_good: |
+ logger.info(f"User search: {request.query!r}") # repr escapes
+
+ - id: "SEC-056"
+ name: "Centralized logging"
+ severity: info
+ description: |
+ Use centralized logging for:
+ - Correlation across services
+ - Tamper-evident storage
+ - Retention management
+ - Alerting capabilities
+
+ - id: "SEC-057"
+ name: "Log level appropriateness"
+ severity: info
+ description: |
+ Use appropriate log levels:
+ - ERROR: Security failures requiring attention
+ - WARNING: Suspicious activity, failed auth
+ - INFO: Successful security events
+ - DEBUG: Never log sensitive data even at debug
+
+ - id: "SEC-058"
+ name: "Structured logging format"
+ severity: info
+ description: |
+ Use structured logging (JSON) for:
+ - Easy parsing
+ - Consistent fields
+ - Searchability
+ suggested_patterns:
+ - "structlog|json_formatter|extra={"
+
+ - id: "SEC-059"
+ name: "Audit log integrity"
+ severity: info
+ description: |
+ Protect audit logs from tampering:
+ - Append-only storage
+ - Cryptographic chaining
+ - Separate access controls
+
+ - id: "SEC-060"
+ name: "Privacy-aware logging"
+ severity: warning
+ description: |
+ Comply with data protection regulations:
+ - No PII in logs without consent
+ - Log retention limits
+ - Right to deletion support
+ file_pattern: "**/*.py"
+ anti_patterns:
+ - 'log.*email(?!.*@.*sanitized)'
+ - 'log.*phone'
+ - 'log.*address(?!.*ip)'
diff --git a/.security-rules/authentication.yaml b/.security-rules/authentication.yaml
new file mode 100644
index 00000000..7a8cdfeb
--- /dev/null
+++ b/.security-rules/authentication.yaml
@@ -0,0 +1,70 @@
+# Authentication Security Rules
+# =============================
+
+authentication_rules:
+ - id: SEC-001
+ name: No hardcoded credentials
+ severity: error
+ description: >
+ Credentials must never be hardcoded in source code.
+ Use environment variables or secret management.
+
+ - id: SEC-002
+ name: JWT expiry enforcement
+ severity: error
+ description: >
+ All JWT tokens must have expiration claims.
+ Access tokens should expire in 15-60 minutes.
+
+ - id: SEC-003
+ name: Password hashing required
+ severity: error
+ description: >
+ Passwords must be hashed using bcrypt, argon2, or scrypt.
+ Never store or compare passwords in plain text.
+
+ - id: SEC-004
+ name: Session regeneration after auth
+ severity: warning
+ description: >
+ Session IDs should be regenerated after authentication
+ to prevent session fixation attacks.
+
+ - id: SEC-005
+ name: Brute force protection
+ severity: warning
+ description: >
+ Login endpoints should implement rate limiting
+ or account lockout after failed attempts.
+
+ - id: SEC-006
+ name: Secure password reset
+ severity: warning
+ description: >
+ Password reset tokens must be cryptographically random,
+ expire within 1 hour, and be single-use.
+
+ - id: SEC-007
+ name: Authentication on sensitive endpoints
+ severity: error
+ description: >
+ All endpoints except public ones must require authentication.
+
+ - id: SEC-008
+ name: Token in Authorization header
+ severity: warning
+ description: >
+ JWT tokens should be sent in Authorization header,
+ not in URL parameters.
+
+ - id: SEC-009
+ name: Logout invalidates tokens
+ severity: warning
+ description: >
+ Logout should invalidate or blacklist tokens.
+
+ - id: SEC-010
+ name: Multi-factor authentication support
+ severity: info
+ description: >
+ Consider implementing MFA for sensitive operations.
diff --git a/.security-rules/cryptography.yaml b/.security-rules/cryptography.yaml
new file mode 100644
index 00000000..c59ce7f6
--- /dev/null
+++ b/.security-rules/cryptography.yaml
@@ -0,0 +1,72 @@
+# Cryptography Rules
+# ==================
+
+cryptography_rules:
+ - id: SEC-041
+ name: Strong hashing algorithms
+ severity: error
+ description: >
+ Use bcrypt, argon2, scrypt for passwords.
+ Use SHA-256 or stronger for general hashing.
+ Never use MD5 or SHA1.
+
+ - id: SEC-042
+ name: Secure random generation
+ severity: error
+ description: >
+ Use the secrets module for security-sensitive randomness.
+ Never use random module for tokens or keys.
+
+ - id: SEC-043
+ name: No hardcoded encryption keys
+ severity: error
+ description: >
+ Encryption keys must come from environment variables
+ or secret management services.
+
+ - id: SEC-044
+ name: Strong encryption algorithms
+ severity: error
+ description: >
+ Use AES-256 or ChaCha20. Never use DES, 3DES, or RC4.
+
+ - id: SEC-045
+ name: Proper IV/nonce usage
+ severity: error
+ description: >
+ Encryption IVs and nonces must be randomly generated
+ and unique per encryption.
+
+ - id: SEC-046
+ name: TLS version requirements
+ severity: warning
+ description: >
+ Enforce TLS 1.2 or higher.
+ Disable SSLv2, SSLv3, TLS 1.0, TLS 1.1.
+
+ - id: SEC-047
+ name: Certificate verification
+ severity: error
+ description: >
+ Always verify SSL certificates.
+ Never disable verification in production.
+
+ - id: SEC-048
+ name: Key derivation for passwords
+ severity: warning
+ description: >
+ When deriving encryption keys from passwords,
+ use PBKDF2 with 100K+ iterations, Argon2, or scrypt.
+
+ - id: SEC-049
+ name: Secure key storage
+ severity: info
+ description: >
+ Encryption keys should be stored in environment variables,
+ secret management, or HSMs.
+
+ - id: SEC-050
+ name: Key rotation support
+ severity: info
+ description: >
+ Implement key rotation with multiple key versions.
diff --git a/.security-rules/data_protection.yaml b/.security-rules/data_protection.yaml
new file mode 100644
index 00000000..2cc184fb
--- /dev/null
+++ b/.security-rules/data_protection.yaml
@@ -0,0 +1,67 @@
+# Data Protection Rules
+# =====================
+
+data_protection_rules:
+ - id: SEC-021
+ name: PII logging prevention
+ severity: error
+ description: >
+ Never log passwords, tokens, credit cards, or sensitive PII.
+
+ - id: SEC-022
+ name: Sensitive data in URLs
+ severity: error
+ description: >
+ Sensitive data should not appear in URL query parameters.
+ Use POST body or headers instead.
+
+ - id: SEC-023
+ name: Mass assignment prevention
+ severity: warning
+ description: >
+ Use explicit field assignment, not **kwargs from user input.
+
+ - id: SEC-024
+ name: Error message information leakage
+ severity: error
+ description: >
+ Error messages should not reveal internal details.
+ No stack traces to users.
+
+ - id: SEC-025
+ name: Secure cookie settings
+ severity: error
+ description: >
+ Cookies must have Secure, HttpOnly, SameSite attributes.
+
+ - id: SEC-026
+ name: Encryption for sensitive data at rest
+ severity: info
+ description: >
+ Consider encrypting sensitive data stored in the database.
+
+ - id: SEC-027
+ name: Data retention limits
+ severity: info
+ description: >
+ Implement data retention policies.
+
+ - id: SEC-028
+ name: Response data filtering
+ severity: warning
+ description: >
+ API responses should not include sensitive internal fields.
+ Use Pydantic response models.
+
+ - id: SEC-029
+ name: File upload validation
+ severity: error
+ description: >
+ Validate uploaded files by extension AND content type.
+ Limit file size.
+
+ - id: SEC-030
+ name: Backup encryption
+ severity: info
+ description: >
+ Database backups should be encrypted.
diff --git a/.security-rules/injection.yaml b/.security-rules/injection.yaml
new file mode 100644
index 00000000..cfc6384d
--- /dev/null
+++ b/.security-rules/injection.yaml
@@ -0,0 +1,70 @@
+# Injection Prevention Rules
+# ==========================
+
+injection_rules:
+ - id: SEC-011
+ name: No raw SQL queries
+ severity: error
+ description: >
+ Use SQLAlchemy ORM or parameterized queries only.
+ Never concatenate user input into SQL strings.
+
+ - id: SEC-012
+ name: No shell command injection
+ severity: error
+ description: >
+ Never use shell=True with subprocess.
+ Use subprocess with list arguments.
+
+ - id: SEC-013
+ name: No code execution
+ severity: error
+ description: >
+ Never use eval() or exec() with user input.
+
+ - id: SEC-014
+ name: Path traversal prevention
+ severity: error
+ description: >
+ Validate file paths to prevent directory traversal.
+ Use secure_filename() for uploads.
+
+ - id: SEC-015
+ name: XSS prevention in templates
+ severity: error
+ description: >
+ Use safe output methods in templates.
+ Prefer x-text over x-html.
+
+ - id: SEC-016
+ name: LDAP injection prevention
+ severity: error
+ description: >
+ Escape special characters in LDAP queries.
+
+ - id: SEC-017
+ name: XML external entity prevention
+ severity: error
+ description: >
+ Disable external entities when parsing XML.
+ Use defusedxml.
+
+ - id: SEC-018
+ name: Template injection prevention
+ severity: error
+ description: >
+ Never render user input as template code.
+
+ - id: SEC-019
+ name: SSRF prevention
+ severity: warning
+ description: >
+ Validate URLs before making external requests.
+ Whitelist allowed domains.
+
+ - id: SEC-020
+ name: Deserialization safety
+ severity: error
+ description: >
+ Never deserialize untrusted data with pickle.
+ Use yaml.safe_load() instead of yaml.load().
diff --git a/alembic/versions/f4a5b6c7d8e9_add_validator_type_to_code_quality.py b/alembic/versions/f4a5b6c7d8e9_add_validator_type_to_code_quality.py
new file mode 100644
index 00000000..35e48b91
--- /dev/null
+++ b/alembic/versions/f4a5b6c7d8e9_add_validator_type_to_code_quality.py
@@ -0,0 +1,95 @@
+"""add_validator_type_to_code_quality
+
+Revision ID: f4a5b6c7d8e9
+Revises: e3f4a5b6c7d8
+Create Date: 2025-12-21
+
+This migration adds validator_type column to architecture scans and violations
+to support multiple validator types (architecture, security, performance).
+"""
+
+from typing import Sequence, Union
+
+from alembic import op
+import sqlalchemy as sa
+
+
+# revision identifiers, used by Alembic.
+revision: str = "f4a5b6c7d8e9"
+down_revision: Union[str, None] = "e3f4a5b6c7d8"
+branch_labels: Union[str, Sequence[str], None] = None
+depends_on: Union[str, Sequence[str], None] = None
+
+
+def upgrade() -> None:
+ # Add validator_type to architecture_scans table
+ op.add_column(
+ "architecture_scans",
+ sa.Column(
+ "validator_type",
+ sa.String(length=20),
+ nullable=False,
+ server_default="architecture",
+ ),
+ )
+ op.create_index(
+ op.f("ix_architecture_scans_validator_type"),
+ "architecture_scans",
+ ["validator_type"],
+ unique=False,
+ )
+
+ # Add validator_type to architecture_violations table
+ op.add_column(
+ "architecture_violations",
+ sa.Column(
+ "validator_type",
+ sa.String(length=20),
+ nullable=False,
+ server_default="architecture",
+ ),
+ )
+ op.create_index(
+ op.f("ix_architecture_violations_validator_type"),
+ "architecture_violations",
+ ["validator_type"],
+ unique=False,
+ )
+
+ # Add validator_type to architecture_rules table
+ op.add_column(
+ "architecture_rules",
+ sa.Column(
+ "validator_type",
+ sa.String(length=20),
+ nullable=False,
+ server_default="architecture",
+ ),
+ )
+ op.create_index(
+ op.f("ix_architecture_rules_validator_type"),
+ "architecture_rules",
+ ["validator_type"],
+ unique=False,
+ )
+
+
+def downgrade() -> None:
+ # Drop indexes first
+ op.drop_index(
+ op.f("ix_architecture_rules_validator_type"),
+ table_name="architecture_rules",
+ )
+ op.drop_index(
+ op.f("ix_architecture_violations_validator_type"),
+ table_name="architecture_violations",
+ )
+ op.drop_index(
+ op.f("ix_architecture_scans_validator_type"),
+ table_name="architecture_scans",
+ )
+
+ # Drop columns
+ op.drop_column("architecture_rules", "validator_type")
+ op.drop_column("architecture_violations", "validator_type")
+ op.drop_column("architecture_scans", "validator_type")
diff --git a/app/services/code_quality_service.py b/app/services/code_quality_service.py
index d4b20460..2c8e9abc 100644
--- a/app/services/code_quality_service.py
+++ b/app/services/code_quality_service.py
@@ -1,6 +1,7 @@
"""
Code Quality Service
-Business logic for managing architecture scans and violations
+Business logic for managing code quality scans and violations
+Supports multiple validator types: architecture, security, performance
"""
import json
@@ -25,25 +26,65 @@ from models.database.architecture_scan import (
logger = logging.getLogger(__name__)
+# Validator type constants
+VALIDATOR_ARCHITECTURE = "architecture"
+VALIDATOR_SECURITY = "security"
+VALIDATOR_PERFORMANCE = "performance"
+
+VALID_VALIDATOR_TYPES = [VALIDATOR_ARCHITECTURE, VALIDATOR_SECURITY, VALIDATOR_PERFORMANCE]
+
+# Map validator types to their scripts
+VALIDATOR_SCRIPTS = {
+ VALIDATOR_ARCHITECTURE: "scripts/validate_architecture.py",
+ VALIDATOR_SECURITY: "scripts/validate_security.py",
+ VALIDATOR_PERFORMANCE: "scripts/validate_performance.py",
+}
+
+# Human-readable names
+VALIDATOR_NAMES = {
+ VALIDATOR_ARCHITECTURE: "Architecture",
+ VALIDATOR_SECURITY: "Security",
+ VALIDATOR_PERFORMANCE: "Performance",
+}
+
class CodeQualityService:
"""Service for managing code quality scans and violations"""
- def run_scan(self, db: Session, triggered_by: str = "manual") -> ArchitectureScan:
+ def run_scan(
+ self,
+ db: Session,
+ triggered_by: str = "manual",
+ validator_type: str = VALIDATOR_ARCHITECTURE,
+ ) -> ArchitectureScan:
"""
- Run architecture validator and store results in database
+ Run a code quality validator and store results in database
Args:
db: Database session
triggered_by: Who/what triggered the scan ('manual', 'scheduled', 'ci/cd')
+ validator_type: Type of validator ('architecture', 'security', 'performance')
Returns:
ArchitectureScan object with results
Raises:
- Exception: If validator script fails
+ ValueError: If validator_type is invalid
+ ScanTimeoutException: If validator times out
+ ScanParseException: If validator output cannot be parsed
"""
- logger.info(f"Starting architecture scan (triggered by: {triggered_by})")
+ if validator_type not in VALID_VALIDATOR_TYPES:
+ raise ValueError(
+ f"Invalid validator type: {validator_type}. "
+ f"Must be one of: {VALID_VALIDATOR_TYPES}"
+ )
+
+ script_path = VALIDATOR_SCRIPTS[validator_type]
+ validator_name = VALIDATOR_NAMES[validator_type]
+
+ logger.info(
+ f"Starting {validator_name} scan (triggered by: {triggered_by})"
+ )
# Get git commit hash
git_commit = self._get_git_commit_hash()
@@ -52,13 +93,13 @@ class CodeQualityService:
start_time = datetime.now()
try:
result = subprocess.run(
- ["python", "scripts/validate_architecture.py", "--json"],
+ ["python", script_path, "--json"],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
)
except subprocess.TimeoutExpired:
- logger.error("Architecture scan timed out after 5 minutes")
+ logger.error(f"{validator_name} scan timed out after 5 minutes")
raise ScanTimeoutException(timeout_seconds=300)
duration = (datetime.now() - start_time).total_seconds()
@@ -79,7 +120,7 @@ class CodeQualityService:
json_output = "\n".join(lines[json_start:])
data = json.loads(json_output)
except (json.JSONDecodeError, ValueError) as e:
- logger.error(f"Failed to parse validator output: {e}")
+ logger.error(f"Failed to parse {validator_name} validator output: {e}")
logger.error(f"Stdout: {result.stdout}")
logger.error(f"Stderr: {result.stderr}")
raise ScanParseException(reason=str(e))
@@ -87,6 +128,7 @@ class CodeQualityService:
# Create scan record
scan = ArchitectureScan(
timestamp=datetime.now(),
+ validator_type=validator_type,
total_files=data.get("files_checked", 0),
total_violations=data.get("total_violations", 0),
errors=data.get("errors", 0),
@@ -100,11 +142,12 @@ class CodeQualityService:
# Create violation records
violations_data = data.get("violations", [])
- logger.info(f"Creating {len(violations_data)} violation records")
+ logger.info(f"Creating {len(violations_data)} {validator_name} violation records")
for v in violations_data:
violation = ArchitectureViolation(
scan_id=scan.id,
+ validator_type=validator_type,
rule_id=v["rule_id"],
rule_name=v["rule_name"],
severity=v["severity"],
@@ -120,43 +163,98 @@ class CodeQualityService:
db.flush()
db.refresh(scan)
- logger.info(f"Scan completed: {scan.total_violations} violations found")
+ logger.info(
+ f"{validator_name} scan completed: {scan.total_violations} violations found"
+ )
return scan
- def get_latest_scan(self, db: Session) -> ArchitectureScan | None:
- """Get the most recent scan"""
- return (
- db.query(ArchitectureScan)
- .order_by(desc(ArchitectureScan.timestamp))
- .first()
- )
+ def run_all_scans(
+ self, db: Session, triggered_by: str = "manual"
+ ) -> list[ArchitectureScan]:
+ """
+ Run all validators and return list of scans
+
+ Args:
+ db: Database session
+ triggered_by: Who/what triggered the scan
+
+ Returns:
+ List of ArchitectureScan objects (one per validator)
+ """
+ results = []
+ for validator_type in VALID_VALIDATOR_TYPES:
+ try:
+ scan = self.run_scan(db, triggered_by, validator_type)
+ results.append(scan)
+ except Exception as e:
+ logger.error(f"Failed to run {validator_type} scan: {e}")
+ # Continue with other validators even if one fails
+ return results
+
+ def get_latest_scan(
+ self, db: Session, validator_type: str = None
+ ) -> ArchitectureScan | None:
+ """
+ Get the most recent scan
+
+ Args:
+ db: Database session
+ validator_type: Optional filter by validator type
+
+ Returns:
+ Most recent ArchitectureScan or None
+ """
+ query = db.query(ArchitectureScan).order_by(desc(ArchitectureScan.timestamp))
+
+ if validator_type:
+ query = query.filter(ArchitectureScan.validator_type == validator_type)
+
+ return query.first()
+
+ def get_latest_scans_by_type(self, db: Session) -> dict[str, ArchitectureScan]:
+ """
+ Get the most recent scan for each validator type
+
+ Returns:
+ Dictionary mapping validator_type to its latest scan
+ """
+ result = {}
+ for vtype in VALID_VALIDATOR_TYPES:
+ scan = self.get_latest_scan(db, validator_type=vtype)
+ if scan:
+ result[vtype] = scan
+ return result
def get_scan_by_id(self, db: Session, scan_id: int) -> ArchitectureScan | None:
"""Get scan by ID"""
return db.query(ArchitectureScan).filter(ArchitectureScan.id == scan_id).first()
- def get_scan_history(self, db: Session, limit: int = 30) -> list[ArchitectureScan]:
+ def get_scan_history(
+ self, db: Session, limit: int = 30, validator_type: str = None
+ ) -> list[ArchitectureScan]:
"""
Get scan history for trend graphs
Args:
db: Database session
limit: Maximum number of scans to return
+ validator_type: Optional filter by validator type
Returns:
List of ArchitectureScan objects, newest first
"""
- return (
- db.query(ArchitectureScan)
- .order_by(desc(ArchitectureScan.timestamp))
- .limit(limit)
- .all()
- )
+ query = db.query(ArchitectureScan).order_by(desc(ArchitectureScan.timestamp))
+
+ if validator_type:
+ query = query.filter(ArchitectureScan.validator_type == validator_type)
+
+ return query.limit(limit).all()
def get_violations(
self,
db: Session,
scan_id: int = None,
+ validator_type: str = None,
severity: str = None,
status: str = None,
rule_id: str = None,
@@ -169,7 +267,8 @@ class CodeQualityService:
Args:
db: Database session
- scan_id: Filter by scan ID (if None, use latest scan)
+ scan_id: Filter by scan ID (if None, use latest scan(s))
+ validator_type: Filter by validator type
severity: Filter by severity ('error', 'warning')
status: Filter by status ('open', 'assigned', 'resolved', etc.)
rule_id: Filter by rule ID
@@ -180,19 +279,33 @@ class CodeQualityService:
Returns:
Tuple of (violations list, total count)
"""
- # If no scan_id specified, use latest scan
- if scan_id is None:
- latest_scan = self.get_latest_scan(db)
- if not latest_scan:
- return [], 0
- scan_id = latest_scan.id
-
# Build query
- query = db.query(ArchitectureViolation).filter(
- ArchitectureViolation.scan_id == scan_id
- )
+ query = db.query(ArchitectureViolation)
- # Apply filters
+ # If scan_id specified, filter by it
+ if scan_id is not None:
+ query = query.filter(ArchitectureViolation.scan_id == scan_id)
+ else:
+ # If no scan_id, get violations from latest scan(s)
+ if validator_type:
+ # Get latest scan for specific validator type
+ latest_scan = self.get_latest_scan(db, validator_type)
+ if not latest_scan:
+ return [], 0
+ query = query.filter(ArchitectureViolation.scan_id == latest_scan.id)
+ else:
+ # Get violations from latest scans of all types
+ latest_scans = self.get_latest_scans_by_type(db)
+ if not latest_scans:
+ return [], 0
+ scan_ids = [s.id for s in latest_scans.values()]
+ query = query.filter(ArchitectureViolation.scan_id.in_(scan_ids))
+
+ # Apply validator_type filter if specified (for scan_id queries)
+ if validator_type and scan_id is not None:
+ query = query.filter(ArchitectureViolation.validator_type == validator_type)
+
+ # Apply other filters
if severity:
query = query.filter(ArchitectureViolation.severity == severity)
@@ -211,7 +324,9 @@ class CodeQualityService:
# Get page of results
violations = (
query.order_by(
- ArchitectureViolation.severity.desc(), ArchitectureViolation.file_path
+ ArchitectureViolation.severity.desc(),
+ ArchitectureViolation.validator_type,
+ ArchitectureViolation.file_path,
)
.limit(limit)
.offset(offset)
@@ -353,40 +468,65 @@ class CodeQualityService:
logger.info(f"Comment added to violation {violation_id} by user {user_id}")
return comment_obj
- def get_dashboard_stats(self, db: Session) -> dict:
+ def get_dashboard_stats(
+ self, db: Session, validator_type: str = None
+ ) -> dict:
"""
Get statistics for dashboard
- Returns:
- Dictionary with various statistics
- """
- latest_scan = self.get_latest_scan(db)
- if not latest_scan:
- return {
- "total_violations": 0,
- "errors": 0,
- "warnings": 0,
- "open": 0,
- "assigned": 0,
- "resolved": 0,
- "ignored": 0,
- "technical_debt_score": 100,
- "trend": [],
- "by_severity": {},
- "by_rule": {},
- "by_module": {},
- "top_files": [],
- "last_scan": None,
- }
+ Args:
+ db: Database session
+ validator_type: Optional filter by validator type. If None, returns combined stats.
+ Returns:
+ Dictionary with various statistics including per-validator breakdown
+ """
+ # Get latest scans by type
+ latest_scans = self.get_latest_scans_by_type(db)
+
+ if not latest_scans:
+ return self._empty_dashboard_stats()
+
+ # If specific validator type requested
+ if validator_type and validator_type in latest_scans:
+ scan = latest_scans[validator_type]
+ return self._get_stats_for_scan(db, scan, validator_type)
+
+ # Combined stats across all validators
+ return self._get_combined_stats(db, latest_scans)
+
+ def _empty_dashboard_stats(self) -> dict:
+ """Return empty dashboard stats structure"""
+ return {
+ "total_violations": 0,
+ "errors": 0,
+ "warnings": 0,
+ "info": 0,
+ "open": 0,
+ "assigned": 0,
+ "resolved": 0,
+ "ignored": 0,
+ "technical_debt_score": 100,
+ "trend": [],
+ "by_severity": {},
+ "by_rule": {},
+ "by_module": {},
+ "top_files": [],
+ "last_scan": None,
+ "by_validator": {},
+ }
+
+ def _get_stats_for_scan(
+ self, db: Session, scan: ArchitectureScan, validator_type: str
+ ) -> dict:
+ """Get stats for a single scan/validator type"""
# Get violation counts by status
status_counts = (
db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
- .filter(ArchitectureViolation.scan_id == latest_scan.id)
+ .filter(ArchitectureViolation.scan_id == scan.id)
.group_by(ArchitectureViolation.status)
.all()
)
-
status_dict = {status: count for status, count in status_counts}
# Get violations by severity
@@ -394,11 +534,10 @@ class CodeQualityService:
db.query(
ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
)
- .filter(ArchitectureViolation.scan_id == latest_scan.id)
+ .filter(ArchitectureViolation.scan_id == scan.id)
.group_by(ArchitectureViolation.severity)
.all()
)
-
by_severity = {sev: count for sev, count in severity_counts}
# Get violations by rule
@@ -406,16 +545,13 @@ class CodeQualityService:
db.query(
ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
)
- .filter(ArchitectureViolation.scan_id == latest_scan.id)
+ .filter(ArchitectureViolation.scan_id == scan.id)
.group_by(ArchitectureViolation.rule_id)
.all()
)
-
by_rule = {
rule: count
- for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[
- :10
- ]
+ for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[:10]
}
# Get top violating files
@@ -424,69 +560,185 @@ class CodeQualityService:
ArchitectureViolation.file_path,
func.count(ArchitectureViolation.id).label("count"),
)
- .filter(ArchitectureViolation.scan_id == latest_scan.id)
+ .filter(ArchitectureViolation.scan_id == scan.id)
.group_by(ArchitectureViolation.file_path)
.order_by(desc("count"))
.limit(10)
.all()
)
-
top_files = [{"file": file, "count": count} for file, count in file_counts]
- # Get violations by module (extract module from file path)
+ # Get violations by module
+ by_module = self._get_violations_by_module(db, scan.id)
+
+ # Get trend for this validator type
+ trend_scans = self.get_scan_history(db, limit=7, validator_type=validator_type)
+ trend = [
+ {
+ "timestamp": s.timestamp.isoformat(),
+ "violations": s.total_violations,
+ "errors": s.errors,
+ "warnings": s.warnings,
+ }
+ for s in reversed(trend_scans)
+ ]
+
+ return {
+ "total_violations": scan.total_violations,
+ "errors": scan.errors,
+ "warnings": scan.warnings,
+ "info": by_severity.get("info", 0),
+ "open": status_dict.get("open", 0),
+ "assigned": status_dict.get("assigned", 0),
+ "resolved": status_dict.get("resolved", 0),
+ "ignored": status_dict.get("ignored", 0),
+ "technical_debt_score": self._calculate_score(scan.errors, scan.warnings),
+ "trend": trend,
+ "by_severity": by_severity,
+ "by_rule": by_rule,
+ "by_module": by_module,
+ "top_files": top_files,
+ "last_scan": scan.timestamp.isoformat(),
+ "validator_type": validator_type,
+ "by_validator": {
+ validator_type: {
+ "total_violations": scan.total_violations,
+ "errors": scan.errors,
+ "warnings": scan.warnings,
+ "last_scan": scan.timestamp.isoformat(),
+ }
+ },
+ }
+
+ def _get_combined_stats(
+ self, db: Session, latest_scans: dict[str, ArchitectureScan]
+ ) -> dict:
+ """Get combined stats across all validators"""
+ # Aggregate totals
+ total_violations = sum(s.total_violations for s in latest_scans.values())
+ total_errors = sum(s.errors for s in latest_scans.values())
+ total_warnings = sum(s.warnings for s in latest_scans.values())
+
+ # Get all scan IDs
+ scan_ids = [s.id for s in latest_scans.values()]
+
+ # Get violation counts by status
+ status_counts = (
+ db.query(ArchitectureViolation.status, func.count(ArchitectureViolation.id))
+ .filter(ArchitectureViolation.scan_id.in_(scan_ids))
+ .group_by(ArchitectureViolation.status)
+ .all()
+ )
+ status_dict = {status: count for status, count in status_counts}
+
+ # Get violations by severity
+ severity_counts = (
+ db.query(
+ ArchitectureViolation.severity, func.count(ArchitectureViolation.id)
+ )
+ .filter(ArchitectureViolation.scan_id.in_(scan_ids))
+ .group_by(ArchitectureViolation.severity)
+ .all()
+ )
+ by_severity = {sev: count for sev, count in severity_counts}
+
+ # Get violations by rule (across all validators)
+ rule_counts = (
+ db.query(
+ ArchitectureViolation.rule_id, func.count(ArchitectureViolation.id)
+ )
+ .filter(ArchitectureViolation.scan_id.in_(scan_ids))
+ .group_by(ArchitectureViolation.rule_id)
+ .all()
+ )
+ by_rule = {
+ rule: count
+ for rule, count in sorted(rule_counts, key=lambda x: x[1], reverse=True)[:10]
+ }
+
+ # Get top violating files
+ file_counts = (
+ db.query(
+ ArchitectureViolation.file_path,
+ func.count(ArchitectureViolation.id).label("count"),
+ )
+ .filter(ArchitectureViolation.scan_id.in_(scan_ids))
+ .group_by(ArchitectureViolation.file_path)
+ .order_by(desc("count"))
+ .limit(10)
+ .all()
+ )
+ top_files = [{"file": file, "count": count} for file, count in file_counts]
+
+ # Get violations by module
+ by_module = {}
+ for scan_id in scan_ids:
+ module_counts = self._get_violations_by_module(db, scan_id)
+ for module, count in module_counts.items():
+ by_module[module] = by_module.get(module, 0) + count
+ by_module = dict(
+ sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10]
+ )
+
+ # Per-validator breakdown
+ by_validator = {}
+ for vtype, scan in latest_scans.items():
+ by_validator[vtype] = {
+ "total_violations": scan.total_violations,
+ "errors": scan.errors,
+ "warnings": scan.warnings,
+ "last_scan": scan.timestamp.isoformat(),
+ }
+
+ # Get most recent scan timestamp
+ most_recent = max(latest_scans.values(), key=lambda s: s.timestamp)
+
+ return {
+ "total_violations": total_violations,
+ "errors": total_errors,
+ "warnings": total_warnings,
+ "info": by_severity.get("info", 0),
+ "open": status_dict.get("open", 0),
+ "assigned": status_dict.get("assigned", 0),
+ "resolved": status_dict.get("resolved", 0),
+ "ignored": status_dict.get("ignored", 0),
+ "technical_debt_score": self._calculate_score(total_errors, total_warnings),
+ "trend": [], # Combined trend would need special handling
+ "by_severity": by_severity,
+ "by_rule": by_rule,
+ "by_module": by_module,
+ "top_files": top_files,
+ "last_scan": most_recent.timestamp.isoformat(),
+ "by_validator": by_validator,
+ }
+
+ def _get_violations_by_module(self, db: Session, scan_id: int) -> dict[str, int]:
+ """Extract module from file paths and count violations"""
by_module = {}
violations = (
db.query(ArchitectureViolation.file_path)
- .filter(ArchitectureViolation.scan_id == latest_scan.id)
+ .filter(ArchitectureViolation.scan_id == scan_id)
.all()
)
for v in violations:
path_parts = v.file_path.split("/")
if len(path_parts) >= 2:
- module = "/".join(path_parts[:2]) # e.g., 'app/api'
+ module = "/".join(path_parts[:2])
else:
module = path_parts[0]
by_module[module] = by_module.get(module, 0) + 1
- # Sort by count and take top 10
- by_module = dict(
- sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10]
- )
+ return dict(sorted(by_module.items(), key=lambda x: x[1], reverse=True)[:10])
- # Calculate technical debt score
- tech_debt_score = self.calculate_technical_debt_score(db, latest_scan.id)
+ def _calculate_score(self, errors: int, warnings: int) -> int:
+ """Calculate technical debt score (0-100)"""
+ score = 100 - (errors * 0.5 + warnings * 0.05)
+ return max(0, min(100, int(score)))
- # Get trend (last 7 scans)
- trend_scans = self.get_scan_history(db, limit=7)
- trend = [
- {
- "timestamp": scan.timestamp.isoformat(),
- "violations": scan.total_violations,
- "errors": scan.errors,
- "warnings": scan.warnings,
- }
- for scan in reversed(trend_scans) # Oldest first for chart
- ]
-
- return {
- "total_violations": latest_scan.total_violations,
- "errors": latest_scan.errors,
- "warnings": latest_scan.warnings,
- "open": status_dict.get("open", 0),
- "assigned": status_dict.get("assigned", 0),
- "resolved": status_dict.get("resolved", 0),
- "ignored": status_dict.get("ignored", 0),
- "technical_debt_score": tech_debt_score,
- "trend": trend,
- "by_severity": by_severity,
- "by_rule": by_rule,
- "by_module": by_module,
- "top_files": top_files,
- "last_scan": latest_scan.timestamp.isoformat() if latest_scan else None,
- }
-
- def calculate_technical_debt_score(self, db: Session, scan_id: int = None) -> int:
+ def calculate_technical_debt_score(
+ self, db: Session, scan_id: int = None, validator_type: str = None
+ ) -> int:
"""
Calculate technical debt score (0-100)
@@ -496,12 +748,13 @@ class CodeQualityService:
Args:
db: Database session
scan_id: Scan ID (if None, use latest)
+ validator_type: Filter by validator type
Returns:
Score from 0-100
"""
if scan_id is None:
- latest_scan = self.get_latest_scan(db)
+ latest_scan = self.get_latest_scan(db, validator_type)
if not latest_scan:
return 100
scan_id = latest_scan.id
@@ -510,8 +763,7 @@ class CodeQualityService:
if not scan:
return 100
- score = 100 - (scan.errors * 0.5 + scan.warnings * 0.05)
- return max(0, min(100, int(score))) # Clamp to 0-100
+ return self._calculate_score(scan.errors, scan.warnings)
def _get_git_commit_hash(self) -> str | None:
"""Get current git commit hash"""
diff --git a/app/templates/admin/code-quality-violations.html b/app/templates/admin/code-quality-violations.html
index fb33361e..a9038656 100644
--- a/app/templates/admin/code-quality-violations.html
+++ b/app/templates/admin/code-quality-violations.html
@@ -14,7 +14,7 @@
{% endblock %}
{% block content %}
-{{ page_header('Architecture Violations', subtitle='Review and manage code quality violations', back_url='/admin/code-quality', back_label='Back to Dashboard') }}
+{{ page_header('Code Quality Violations', subtitle='Review and manage violations across all validators', back_url='/admin/code-quality', back_label='Back to Dashboard') }}
{{ loading_state('Loading violations...') }}
@@ -25,7 +25,22 @@
+```
+
+### PERF-059: Minimize Alpine.js Watchers
+**Severity:** Info
+
+Excessive `$watch` calls impact performance. Use computed properties or event handlers instead.
+
+### PERF-060: Virtual Scrolling for Long Lists
+**Severity:** Info
+
+Lists with 100+ items should use virtual scrolling. Only render visible items in the viewport.
+
+### PERF-061: Minimize Bundle Size
+**Severity:** Info
+
+Reduce JavaScript bundle size: import only needed modules, use tree-shaking, split code by route.
+
+### PERF-062: Reasonable Polling Intervals
+**Severity:** Warning
+
+Polling should be >= 10 seconds for non-critical updates.
+
+```javascript
+// Bad
+setInterval(fetchUpdates, 1000); // Every second
+
+// Good
+setInterval(fetchUpdates, 30000); // Every 30 seconds
+```
+
+### PERF-063 to PERF-070: Additional Frontend Rules
+**Severity:** Info/Warning
+
+CSS containment, avoid layout thrashing, CSS animations over JavaScript, preload critical resources, defer non-critical JavaScript, minimize DOM nodes, efficient event handlers, and cache DOM queries.
+
+---
+
+## Configuration
+
+All rules are defined in `.performance-rules/` directory:
+
+```
+.performance-rules/
+├── _main.yaml # Main configuration
+├── database.yaml # PERF-001 to PERF-015
+├── caching.yaml # PERF-016 to PERF-025
+├── api.yaml # PERF-026 to PERF-035
+├── async.yaml # PERF-036 to PERF-045
+├── memory.yaml # PERF-046 to PERF-055
+└── frontend.yaml # PERF-056 to PERF-070
+```
+
+## Suppressing Rules
+
+Use noqa comments to suppress specific rules:
+
+```python
+# noqa: PERF-003 - This is intentionally unbounded for admin export
+products = db.query(Product).all()
+```
+
+## Related Documentation
+
+- [Architecture Rules](architecture-rules.md)
+- [Security Rules](security-rules.md)
+- [Code Quality Guide](code-quality.md)
+- [Contributing Guide](contributing.md)
+
+---
+
+## Summary Statistics
+
+| Category | Rules | Errors | Warnings | Info |
+|----------|-------|--------|----------|------|
+| Database | 15 | 0 | 5 | 10 |
+| Caching | 10 | 0 | 2 | 8 |
+| API | 10 | 1 | 3 | 6 |
+| Async & Concurrency | 10 | 1 | 4 | 5 |
+| Memory Management | 10 | 1 | 4 | 5 |
+| Frontend | 15 | 0 | 4 | 11 |
+| **Total** | **70** | **3** | **22** | **45** |
+
+---
+
+**Last Updated:** 2025-12-21
+**Version:** 1.0
diff --git a/docs/development/security-rules.md b/docs/development/security-rules.md
new file mode 100644
index 00000000..4b7ec185
--- /dev/null
+++ b/docs/development/security-rules.md
@@ -0,0 +1,560 @@
+# Security Rules Reference
+
+This document provides a comprehensive reference for all security rules enforced by the `scripts/validate_security.py` validator.
+
+## Overview
+
+The security validator identifies potential security vulnerabilities and enforces security best practices across the codebase. Rules are organized by category and severity level.
+
+**Version:** 1.0
+**Total Rules:** 60
+**Configuration Directory:** `.security-rules/`
+
+## Running the Validator
+
+### Using Python Directly
+
+```bash
+# Check all files
+python scripts/validate_security.py
+
+# Verbose output
+python scripts/validate_security.py -v
+
+# Errors only
+python scripts/validate_security.py --errors-only
+
+# JSON output (for CI/CD)
+python scripts/validate_security.py --json
+```
+
+### Using the Unified Validator
+
+```bash
+# Run security checks only
+python scripts/validate_all.py --security
+
+# Run all validators
+python scripts/validate_all.py
+```
+
+## Severity Levels
+
+| Severity | Description | Exit Code | Action Required |
+|----------|-------------|-----------|-----------------|
+| **Error** | Critical security vulnerability | 1 | Must fix immediately |
+| **Warning** | Security concern | 0 | Should fix |
+| **Info** | Security suggestion | 0 | Consider implementing |
+
+---
+
+## Authentication Rules (SEC-001 to SEC-010)
+
+### SEC-001: No Hardcoded Credentials
+**Severity:** Error
+
+Credentials must never be hardcoded in source code. Use environment variables or secret management.
+
+```python
+# Bad
+api_key = "sk-1234567890abcdef"
+password = "admin123"
+
+# Good
+api_key = os.environ.get("API_KEY")
+password = settings.admin_password
+```
+
+### SEC-002: JWT Expiry Enforcement
+**Severity:** Error
+
+All JWT tokens must have expiration claims. Access tokens should expire in 15-60 minutes.
+
+### SEC-003: Password Hashing Required
+**Severity:** Error
+
+Passwords must be hashed using bcrypt, argon2, or scrypt. Never store or compare passwords in plain text.
+
+```python
+# Bad
+if user.password == password:
+ ...
+
+# Good
+if bcrypt.checkpw(password.encode(), user.hashed_password):
+ ...
+```
+
+### SEC-004: Session Regeneration After Auth
+**Severity:** Warning
+
+Session IDs should be regenerated after authentication to prevent session fixation attacks.
+
+### SEC-005: Brute Force Protection
+**Severity:** Warning
+
+Login endpoints should implement rate limiting or account lockout after failed attempts.
+
+### SEC-006: Secure Password Reset
+**Severity:** Warning
+
+Password reset tokens must be cryptographically random, expire within 1 hour, and be single-use.
+
+### SEC-007: Authentication on Sensitive Endpoints
+**Severity:** Error
+
+All endpoints except public ones must require authentication.
+
+### SEC-008: Token in Authorization Header
+**Severity:** Warning
+
+JWT tokens should be sent in Authorization header, not in URL parameters.
+
+### SEC-009: Logout Invalidates Tokens
+**Severity:** Warning
+
+Logout should invalidate or blacklist tokens.
+
+### SEC-010: Multi-Factor Authentication Support
+**Severity:** Info
+
+Consider implementing MFA for sensitive operations.
+
+---
+
+## Injection Prevention Rules (SEC-011 to SEC-020)
+
+### SEC-011: No Raw SQL Queries
+**Severity:** Error
+
+Use SQLAlchemy ORM or parameterized queries only. Never concatenate user input into SQL strings.
+
+```python
+# Bad
+db.execute(f"SELECT * FROM users WHERE name = '{name}'")
+
+# Good
+db.query(User).filter(User.name == name)
+```
+
+### SEC-012: No Shell Command Injection
+**Severity:** Error
+
+Never use `shell=True` with subprocess. Use subprocess with list arguments.
+
+```python
+# Bad
+subprocess.run(f"convert {filename}", shell=True)
+os.system(f"rm {filename}")
+
+# Good
+subprocess.run(["convert", filename])
+```
+
+### SEC-013: No Code Execution
+**Severity:** Error
+
+Never use `eval()` or `exec()` with user input.
+
+### SEC-014: Path Traversal Prevention
+**Severity:** Error
+
+Validate file paths to prevent directory traversal. Use `secure_filename()` for uploads.
+
+```python
+# Bad
+path = f"/uploads/{filename}"
+
+# Good
+from werkzeug.utils import secure_filename
+path = f"/uploads/{secure_filename(filename)}"
+```
+
+### SEC-015: XSS Prevention in Templates
+**Severity:** Error
+
+Use safe output methods in templates. Prefer `x-text` over `x-html` in Alpine.js.
+
+```html
+
+
+
+
+
+```
+
+### SEC-016: LDAP Injection Prevention
+**Severity:** Error
+
+Escape special characters in LDAP queries.
+
+### SEC-017: XML External Entity Prevention
+**Severity:** Error
+
+Disable external entities when parsing XML. Use `defusedxml`.
+
+```python
+# Bad
+import xml.etree.ElementTree as ET
+tree = ET.parse(user_file)
+
+# Good
+import defusedxml.ElementTree as ET
+tree = ET.parse(user_file)
+```
+
+### SEC-018: Template Injection Prevention
+**Severity:** Error
+
+Never render user input as template code.
+
+### SEC-019: SSRF Prevention
+**Severity:** Warning
+
+Validate URLs before making external requests. Whitelist allowed domains.
+
+### SEC-020: Deserialization Safety
+**Severity:** Error
+
+Never deserialize untrusted data with pickle. Use `yaml.safe_load()` instead of `yaml.load()`.
+
+```python
+# Bad
+data = pickle.loads(user_data)
+config = yaml.load(user_config)
+
+# Good
+config = yaml.safe_load(user_config)
+data = json.loads(user_data)
+```
+
+---
+
+## Data Protection Rules (SEC-021 to SEC-030)
+
+### SEC-021: PII Logging Prevention
+**Severity:** Error
+
+Never log passwords, tokens, credit cards, or sensitive PII.
+
+```python
+# Bad
+logger.info(f"User login: {username}, password: {password}")
+
+# Good
+logger.info(f"User login: {username}")
+```
+
+### SEC-022: Sensitive Data in URLs
+**Severity:** Error
+
+Sensitive data should not appear in URL query parameters. Use POST body or headers instead.
+
+### SEC-023: Mass Assignment Prevention
+**Severity:** Warning
+
+Use explicit field assignment, not `**kwargs` from user input.
+
+```python
+# Bad
+user = User(**request.json)
+
+# Good
+user = User(
+ name=request.json.get("name"),
+ email=request.json.get("email")
+)
+```
+
+### SEC-024: Error Message Information Leakage
+**Severity:** Error
+
+Error messages should not reveal internal details. No stack traces to users.
+
+### SEC-025: Secure Cookie Settings
+**Severity:** Error
+
+Cookies must have Secure, HttpOnly, SameSite attributes.
+
+### SEC-026: Encryption for Sensitive Data at Rest
+**Severity:** Info
+
+Consider encrypting sensitive data stored in the database.
+
+### SEC-027: Data Retention Limits
+**Severity:** Info
+
+Implement data retention policies.
+
+### SEC-028: Response Data Filtering
+**Severity:** Warning
+
+API responses should not include sensitive internal fields. Use Pydantic response models.
+
+### SEC-029: File Upload Validation
+**Severity:** Error
+
+Validate uploaded files by extension AND content type. Limit file size.
+
+### SEC-030: Backup Encryption
+**Severity:** Info
+
+Database backups should be encrypted.
+
+---
+
+## API Security Rules (SEC-031 to SEC-040)
+
+### SEC-031: CORS Origin Validation
+**Severity:** Error
+
+CORS must not allow all origins in production. Specify allowed origins explicitly.
+
+```python
+# Bad
+allow_origins=["*"]
+
+# Good
+allow_origins=["https://example.com", "https://api.example.com"]
+```
+
+### SEC-032: Rate Limiting on Sensitive Endpoints
+**Severity:** Warning
+
+Auth, password reset, and payment endpoints need rate limiting.
+
+### SEC-033: Security Headers
+**Severity:** Warning
+
+Configure security headers like X-Content-Type-Options, X-Frame-Options, Content-Security-Policy.
+
+### SEC-034: HTTPS Enforcement
+**Severity:** Error
+
+External URLs must use HTTPS. HTTP is only acceptable for localhost.
+
+### SEC-035: Request Size Limits
+**Severity:** Warning
+
+Limit request body size to prevent DoS attacks.
+
+### SEC-036: Input Validation with Pydantic
+**Severity:** Warning
+
+All API inputs should be validated using Pydantic models.
+
+### SEC-037: API Versioning
+**Severity:** Info
+
+APIs should be versioned for security update isolation.
+
+### SEC-038: Method Restrictions
+**Severity:** Warning
+
+Endpoints should only allow necessary HTTP methods.
+
+### SEC-039: Authentication Bypass Prevention
+**Severity:** Error
+
+Ensure authentication cannot be bypassed.
+
+### SEC-040: Timeout Configuration
+**Severity:** Warning
+
+All external calls must have timeouts configured.
+
+---
+
+## Cryptography Rules (SEC-041 to SEC-050)
+
+### SEC-041: Strong Hashing Algorithms
+**Severity:** Error
+
+Use bcrypt, argon2, scrypt for passwords. Use SHA-256 or stronger for general hashing. Never use MD5 or SHA1.
+
+```python
+# Bad
+import hashlib
+password_hash = hashlib.md5(password.encode()).hexdigest()
+
+# Good
+import bcrypt
+password_hash = bcrypt.hashpw(password.encode(), bcrypt.gensalt())
+```
+
+### SEC-042: Secure Random Generation
+**Severity:** Error
+
+Use the `secrets` module for security-sensitive randomness. Never use `random` module for tokens or keys.
+
+```python
+# Bad
+import random
+token = ''.join(random.choices(string.ascii_letters, k=32))
+
+# Good
+import secrets
+token = secrets.token_urlsafe(32)
+```
+
+### SEC-043: No Hardcoded Encryption Keys
+**Severity:** Error
+
+Encryption keys must come from environment variables or secret management services.
+
+### SEC-044: Strong Encryption Algorithms
+**Severity:** Error
+
+Use AES-256 or ChaCha20. Never use DES, 3DES, or RC4.
+
+### SEC-045: Proper IV/Nonce Usage
+**Severity:** Error
+
+Encryption IVs and nonces must be randomly generated and unique per encryption.
+
+### SEC-046: TLS Version Requirements
+**Severity:** Warning
+
+Enforce TLS 1.2 or higher. Disable SSLv2, SSLv3, TLS 1.0, TLS 1.1.
+
+### SEC-047: Certificate Verification
+**Severity:** Error
+
+Always verify SSL certificates. Never disable verification in production.
+
+```python
+# Bad
+requests.get(url, verify=False)
+
+# Good
+requests.get(url, verify=True)
+```
+
+### SEC-048: Key Derivation for Passwords
+**Severity:** Warning
+
+When deriving encryption keys from passwords, use PBKDF2 with 100K+ iterations, Argon2, or scrypt.
+
+### SEC-049: Secure Key Storage
+**Severity:** Info
+
+Encryption keys should be stored in environment variables, secret management, or HSMs.
+
+### SEC-050: Key Rotation Support
+**Severity:** Info
+
+Implement key rotation with multiple key versions.
+
+---
+
+## Audit & Logging Rules (SEC-051 to SEC-060)
+
+### SEC-051: Authentication Event Logging
+**Severity:** Warning
+
+Log authentication events including successful logins, failed attempts, logouts, and password changes.
+
+### SEC-052: Admin Action Audit Trail
+**Severity:** Warning
+
+All admin operations should be logged with admin user ID, action performed, target resource, timestamp, and IP address.
+
+### SEC-053: Data Modification Logging
+**Severity:** Info
+
+Log create/update/delete on sensitive data like user accounts, roles, financial transactions, and configuration changes.
+
+### SEC-054: Security Event Logging
+**Severity:** Warning
+
+Log security-relevant events like authorization failures, input validation failures, rate limit triggers, and suspicious activity.
+
+### SEC-055: Log Injection Prevention
+**Severity:** Warning
+
+Sanitize user input before logging. Newlines and control characters can corrupt logs.
+
+```python
+# Bad
+logger.info(f"User search: {request.query}")
+
+# Good
+logger.info(f"User search: {request.query!r}") # repr escapes
+```
+
+### SEC-056: Centralized Logging
+**Severity:** Info
+
+Use centralized logging for correlation across services, tamper-evident storage, and alerting.
+
+### SEC-057: Log Level Appropriateness
+**Severity:** Info
+
+Use appropriate log levels: ERROR for security failures, WARNING for suspicious activity, INFO for successful events.
+
+### SEC-058: Structured Logging Format
+**Severity:** Info
+
+Use structured logging (JSON) for easy parsing and searchability.
+
+### SEC-059: Audit Log Integrity
+**Severity:** Info
+
+Protect audit logs from tampering with append-only storage and cryptographic chaining.
+
+### SEC-060: Privacy-Aware Logging
+**Severity:** Warning
+
+Comply with data protection regulations. No PII in logs without consent.
+
+---
+
+## Configuration
+
+All rules are defined in `.security-rules/` directory:
+
+```
+.security-rules/
+├── _main.yaml # Main configuration
+├── authentication.yaml # SEC-001 to SEC-010
+├── injection.yaml # SEC-011 to SEC-020
+├── data_protection.yaml # SEC-021 to SEC-030
+├── api_security.yaml # SEC-031 to SEC-040
+├── cryptography.yaml # SEC-041 to SEC-050
+└── audit.yaml # SEC-051 to SEC-060
+```
+
+## Suppressing Rules
+
+Use noqa comments to suppress specific rules:
+
+```python
+# noqa: SEC-001 - This is a test file with intentional test credentials
+test_password = "test123"
+```
+
+## Related Documentation
+
+- [Architecture Rules](architecture-rules.md)
+- [Performance Rules](performance-rules.md)
+- [Code Quality Guide](code-quality.md)
+- [Contributing Guide](contributing.md)
+
+---
+
+## Summary Statistics
+
+| Category | Rules | Errors | Warnings | Info |
+|----------|-------|--------|----------|------|
+| Authentication | 10 | 4 | 5 | 1 |
+| Injection Prevention | 10 | 9 | 1 | 0 |
+| Data Protection | 10 | 4 | 3 | 3 |
+| API Security | 10 | 4 | 5 | 1 |
+| Cryptography | 10 | 6 | 2 | 2 |
+| Audit & Logging | 10 | 0 | 5 | 5 |
+| **Total** | **60** | **27** | **21** | **12** |
+
+---
+
+**Last Updated:** 2025-12-21
+**Version:** 1.0
diff --git a/mkdocs.yml b/mkdocs.yml
index 29202247..d62646ab 100644
--- a/mkdocs.yml
+++ b/mkdocs.yml
@@ -40,6 +40,7 @@ nav:
- Authentication & RBAC: architecture/auth-rbac.md
- Frontend Structure: architecture/frontend-structure.md
- Models Structure: architecture/models-structure.md
+ - Background Tasks: architecture/background-tasks.md
- API Consolidation:
- Proposal: architecture/api-consolidation-proposal.md
- Migration Status: architecture/api-migration-status.md
@@ -117,6 +118,8 @@ nav:
- Contributing Guide: development/contributing.md
- Code Quality: development/code-quality.md
- Architecture Rules: development/architecture-rules.md
+ - Security Rules: development/security-rules.md
+ - Performance Rules: development/performance-rules.md
- Code Quality Dashboard: development/code-quality-dashboard-implementation.md
- Icons Guide: development/icons-guide.md
- Naming Conventions: development/naming-conventions.md
diff --git a/models/schema/stats.py b/models/schema/stats.py
index 182f2dbb..8f7e6e5a 100644
--- a/models/schema/stats.py
+++ b/models/schema/stats.py
@@ -243,15 +243,29 @@ class VendorAnalyticsResponse(BaseModel):
# ============================================================================
+class ValidatorStats(BaseModel):
+ """Statistics for a single validator type."""
+
+ total_violations: int = 0
+ errors: int = 0
+ warnings: int = 0
+ last_scan: str | None = None
+
+
class CodeQualityDashboardStatsResponse(BaseModel):
"""Code quality dashboard statistics response schema.
Used by: GET /api/v1/admin/code-quality/stats
+
+ Supports multiple validator types: architecture, security, performance.
+ When validator_type is specified, returns stats for that type only.
+ When not specified, returns combined stats with per-validator breakdown.
"""
total_violations: int
errors: int
warnings: int
+ info: int = 0
open: int
assigned: int
resolved: int
@@ -263,6 +277,11 @@ class CodeQualityDashboardStatsResponse(BaseModel):
by_module: dict[str, Any] = Field(default_factory=dict)
top_files: list[dict[str, Any]] = Field(default_factory=list)
last_scan: str | None = None
+ validator_type: str | None = None # Set when filtering by type
+ by_validator: dict[str, ValidatorStats] = Field(
+ default_factory=dict,
+ description="Per-validator breakdown (architecture, security, performance)",
+ )
# ============================================================================
diff --git a/scripts/base_validator.py b/scripts/base_validator.py
new file mode 100755
index 00000000..ce2d4224
--- /dev/null
+++ b/scripts/base_validator.py
@@ -0,0 +1,465 @@
+#!/usr/bin/env python3
+"""
+Base Validator
+==============
+Shared base class for all validation scripts (architecture, security, performance).
+
+Provides common functionality for:
+- Loading YAML configuration
+- File pattern matching
+- Violation tracking
+- Output formatting (human-readable and JSON)
+"""
+
+import json
+import re
+import sys
+from abc import ABC, abstractmethod
+from dataclasses import dataclass, field
+from enum import Enum
+from pathlib import Path
+from typing import Any
+
+import yaml
+
+
+class Severity(Enum):
+ """Validation severity levels"""
+
+ ERROR = "error"
+ WARNING = "warning"
+ INFO = "info"
+
+
+@dataclass
+class Violation:
+ """Represents a rule violation"""
+
+ rule_id: str
+ rule_name: str
+ severity: Severity
+ file_path: Path
+ line_number: int
+ message: str
+ context: str = ""
+ suggestion: str = ""
+
+
+@dataclass
+class FileResult:
+ """Results for a single file validation"""
+
+ file_path: Path
+ errors: int = 0
+ warnings: int = 0
+ info: int = 0
+
+ @property
+ def passed(self) -> bool:
+ return self.errors == 0
+
+ @property
+ def status(self) -> str:
+ if self.errors > 0:
+ return "FAILED"
+ if self.warnings > 0:
+ return "PASSED*"
+ return "PASSED"
+
+ @property
+ def status_icon(self) -> str:
+ if self.errors > 0:
+ return "❌"
+ if self.warnings > 0:
+ return "⚠️"
+ return "✅"
+
+
+@dataclass
+class ValidationResult:
+ """Results of validation"""
+
+ violations: list[Violation] = field(default_factory=list)
+ files_checked: int = 0
+ rules_applied: int = 0
+ file_results: list[FileResult] = field(default_factory=list)
+
+ def has_errors(self) -> bool:
+ """Check if there are any error-level violations"""
+ return any(v.severity == Severity.ERROR for v in self.violations)
+
+ def has_warnings(self) -> bool:
+ """Check if there are any warning-level violations"""
+ return any(v.severity == Severity.WARNING for v in self.violations)
+
+ def error_count(self) -> int:
+ return sum(1 for v in self.violations if v.severity == Severity.ERROR)
+
+ def warning_count(self) -> int:
+ return sum(1 for v in self.violations if v.severity == Severity.WARNING)
+
+ def info_count(self) -> int:
+ return sum(1 for v in self.violations if v.severity == Severity.INFO)
+
+
+class BaseValidator(ABC):
+ """Abstract base validator class"""
+
+ # Subclasses should override these
+ VALIDATOR_NAME = "Base Validator"
+ VALIDATOR_EMOJI = "🔍"
+ RULES_DIR_NAME = ".rules"
+ CONFIG_FILE_NAME = ".rules.yaml"
+
+ def __init__(self, config_path: Path = None, verbose: bool = False):
+ """Initialize validator with configuration"""
+ self.project_root = Path.cwd()
+ self.config_path = config_path or self.project_root / self.CONFIG_FILE_NAME
+ self.verbose = verbose
+ self.config = self._load_config()
+ self.result = ValidationResult()
+
+ def _load_config(self) -> dict[str, Any]:
+ """
+ Load validation rules from YAML config.
+
+ Supports two modes:
+ 1. Split directory mode: rules directory with multiple YAML files
+ 2. Single file mode: single YAML file (legacy)
+
+ The split directory mode takes precedence if it exists.
+ """
+ # Check for split directory mode first
+ rules_dir = self.project_root / self.RULES_DIR_NAME
+ if rules_dir.is_dir():
+ return self._load_config_from_directory(rules_dir)
+
+ # Fall back to single file mode
+ if not self.config_path.exists():
+ print(f"❌ Configuration file not found: {self.config_path}")
+ print(f" (Also checked for directory: {rules_dir})")
+ sys.exit(1)
+
+ with open(self.config_path) as f:
+ config = yaml.safe_load(f)
+
+ print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
+ return config
+
+ def _load_config_from_directory(self, rules_dir: Path) -> dict[str, Any]:
+ """
+ Load and merge configuration from split YAML files in a directory.
+
+ Reads _main.yaml first for base config, then merges all other YAML files.
+ """
+ config: dict[str, Any] = {}
+
+ # Load _main.yaml first (contains project info, principles, ignore patterns)
+ main_file = rules_dir / "_main.yaml"
+ if main_file.exists():
+ with open(main_file) as f:
+ config = yaml.safe_load(f) or {}
+
+ # Load all other YAML files and merge their contents
+ yaml_files = sorted(rules_dir.glob("*.yaml"))
+ for yaml_file in yaml_files:
+ if yaml_file.name == "_main.yaml":
+ continue # Already loaded
+
+ with open(yaml_file) as f:
+ file_config = yaml.safe_load(f) or {}
+
+ # Merge rule sections from this file into main config
+ for key, value in file_config.items():
+ if key.endswith("_rules") and isinstance(value, list):
+ # Merge rule lists
+ if key not in config:
+ config[key] = []
+ config[key].extend(value)
+ elif key not in config:
+ # Add new top-level keys
+ config[key] = value
+
+ print(f"📋 Loaded {self.VALIDATOR_NAME}: {config.get('project', 'unknown')}")
+ print(f" (from {len(yaml_files)} files in {rules_dir.name}/)")
+ return config
+
+ def _should_ignore_file(self, file_path: Path) -> bool:
+ """Check if a file should be ignored based on config patterns"""
+ import fnmatch
+
+ ignore_config = self.config.get("ignore", {})
+ ignore_files = ignore_config.get("files", [])
+
+ # Get relative path for matching
+ try:
+ rel_path = file_path.relative_to(self.project_root)
+ except ValueError:
+ rel_path = file_path
+
+ rel_path_str = str(rel_path)
+
+ for pattern in ignore_files:
+ # Handle glob patterns using fnmatch
+ if "*" in pattern:
+ # fnmatch handles *, **, and ? patterns correctly
+ if fnmatch.fnmatch(rel_path_str, pattern):
+ return True
+ # Also check each path component for patterns like **/.venv/**
+ # This handles cases where the pattern expects any prefix
+ if pattern.startswith("**/"):
+ # Try matching without the **/ prefix (e.g., .venv/** matches .venv/foo)
+ suffix_pattern = pattern[3:] # Remove "**/""
+ if fnmatch.fnmatch(rel_path_str, suffix_pattern):
+ return True
+ elif pattern in rel_path_str:
+ return True
+
+ return False
+
+ def _add_violation(
+ self,
+ rule_id: str,
+ rule_name: str,
+ severity: Severity,
+ file_path: Path,
+ line_number: int,
+ message: str,
+ context: str = "",
+ suggestion: str = "",
+ ):
+ """Add a violation to the results"""
+ # Check for inline noqa comment
+ if f"noqa: {rule_id.lower()}" in context.lower():
+ return
+
+ self.result.violations.append(
+ Violation(
+ rule_id=rule_id,
+ rule_name=rule_name,
+ severity=severity,
+ file_path=file_path,
+ line_number=line_number,
+ message=message,
+ context=context,
+ suggestion=suggestion,
+ )
+ )
+
+ def _get_rule(self, rule_id: str) -> dict | None:
+ """Look up a rule by ID across all rule categories"""
+ for key, value in self.config.items():
+ if key.endswith("_rules") and isinstance(value, list):
+ for rule in value:
+ if rule.get("id") == rule_id:
+ return rule
+ return None
+
+ def _check_pattern_in_file(
+ self,
+ file_path: Path,
+ content: str,
+ lines: list[str],
+ pattern: str,
+ rule_id: str,
+ rule_name: str,
+ severity: Severity,
+ message: str,
+ suggestion: str = "",
+ exclude_patterns: list[str] = None,
+ ):
+ """Check for a regex pattern in a file and report violations"""
+ exclude_patterns = exclude_patterns or []
+
+ for i, line in enumerate(lines, 1):
+ if re.search(pattern, line, re.IGNORECASE):
+ # Check exclusions
+ should_exclude = False
+ for exclude in exclude_patterns:
+ if exclude in line:
+ should_exclude = True
+ break
+
+ if not should_exclude:
+ self._add_violation(
+ rule_id=rule_id,
+ rule_name=rule_name,
+ severity=severity,
+ file_path=file_path,
+ line_number=i,
+ message=message,
+ context=line.strip()[:100],
+ suggestion=suggestion,
+ )
+
+ @abstractmethod
+ def validate_all(self, target_path: Path = None) -> ValidationResult:
+ """Validate all files in a directory - must be implemented by subclasses"""
+ pass
+
+ def validate_file(self, file_path: Path, quiet: bool = False) -> ValidationResult:
+ """Validate a single file"""
+ if not file_path.exists():
+ if not quiet:
+ print(f"❌ File not found: {file_path}")
+ return self.result
+
+ if not file_path.is_file():
+ if not quiet:
+ print(f"❌ Not a file: {file_path}")
+ return self.result
+
+ if not quiet:
+ print(f"\n{self.VALIDATOR_EMOJI} Validating single file: {file_path}\n")
+
+ # Resolve file path to absolute
+ file_path = file_path.resolve()
+
+ if self._should_ignore_file(file_path):
+ if not quiet:
+ print("⏭️ File is in ignore list, skipping")
+ return self.result
+
+ self.result.files_checked += 1
+
+ # Track violations before this file
+ violations_before = len(self.result.violations)
+
+ content = file_path.read_text()
+ lines = content.split("\n")
+
+ # Call subclass-specific validation
+ self._validate_file_content(file_path, content, lines)
+
+ # Calculate violations for this file
+ file_violations = self.result.violations[violations_before:]
+ errors = sum(1 for v in file_violations if v.severity == Severity.ERROR)
+ warnings = sum(1 for v in file_violations if v.severity == Severity.WARNING)
+ info = sum(1 for v in file_violations if v.severity == Severity.INFO)
+
+ # Track file result
+ self.result.file_results.append(
+ FileResult(file_path=file_path, errors=errors, warnings=warnings, info=info)
+ )
+
+ return self.result
+
+ @abstractmethod
+ def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
+ """Validate file content - must be implemented by subclasses"""
+ pass
+
+ def output_results(self, json_output: bool = False, errors_only: bool = False):
+ """Output validation results"""
+ if json_output:
+ self._output_json()
+ else:
+ self._output_human(errors_only)
+
+ def _output_json(self):
+ """Output results as JSON
+
+ Format matches code quality service expectations:
+ - file_path (not file)
+ - line_number (not line)
+ - total_violations count
+ """
+ try:
+ rel_base = self.project_root
+ except Exception:
+ rel_base = Path.cwd()
+
+ def get_relative_path(file_path: Path) -> str:
+ """Get relative path from project root"""
+ try:
+ return str(file_path.relative_to(rel_base))
+ except ValueError:
+ return str(file_path)
+
+ output = {
+ "validator": self.VALIDATOR_NAME,
+ "files_checked": self.result.files_checked,
+ "total_violations": len(self.result.violations),
+ "errors": self.result.error_count(),
+ "warnings": self.result.warning_count(),
+ "info": self.result.info_count(),
+ "violations": [
+ {
+ "rule_id": v.rule_id,
+ "rule_name": v.rule_name,
+ "severity": v.severity.value,
+ "file_path": get_relative_path(v.file_path),
+ "line_number": v.line_number,
+ "message": v.message,
+ "context": v.context,
+ "suggestion": v.suggestion,
+ }
+ for v in self.result.violations
+ ],
+ }
+ print(json.dumps(output, indent=2))
+
+ def _output_human(self, errors_only: bool = False):
+ """Output results in human-readable format"""
+ print("\n" + "=" * 80)
+ print(f"📊 {self.VALIDATOR_NAME.upper()} REPORT")
+ print("=" * 80)
+
+ errors = [v for v in self.result.violations if v.severity == Severity.ERROR]
+ warnings = [v for v in self.result.violations if v.severity == Severity.WARNING]
+ info = [v for v in self.result.violations if v.severity == Severity.INFO]
+
+ print(
+ f"\nFiles checked: {self.result.files_checked}"
+ )
+ print(
+ f"Findings: {len(errors)} errors, {len(warnings)} warnings, {len(info)} info"
+ )
+
+ if errors:
+ print(f"\n\n❌ ERRORS ({len(errors)}):")
+ print("-" * 80)
+ for v in errors:
+ self._print_violation(v)
+
+ if warnings and not errors_only:
+ print(f"\n\n⚠️ WARNINGS ({len(warnings)}):")
+ print("-" * 80)
+ for v in warnings:
+ self._print_violation(v)
+
+ if info and not errors_only:
+ print(f"\nℹ️ INFO ({len(info)}):")
+ print("-" * 80)
+ for v in info:
+ self._print_violation(v)
+
+ print("\n" + "=" * 80)
+ if errors:
+ print("❌ VALIDATION FAILED")
+ elif warnings:
+ print(f"⚠️ VALIDATION PASSED WITH {len(warnings)} WARNING(S)")
+ else:
+ print("✅ VALIDATION PASSED")
+ print("=" * 80)
+
+ def _print_violation(self, v: Violation):
+ """Print a single violation"""
+ try:
+ rel_path = v.file_path.relative_to(self.project_root)
+ except ValueError:
+ rel_path = v.file_path
+
+ print(f"\n [{v.rule_id}] {v.rule_name}")
+ print(f" File: {rel_path}:{v.line_number}")
+ print(f" Issue: {v.message}")
+ if v.context and self.verbose:
+ print(f" Context: {v.context}")
+ if v.suggestion:
+ print(f" 💡 Suggestion: {v.suggestion}")
+
+ def get_exit_code(self) -> int:
+ """Get appropriate exit code based on results"""
+ if self.result.has_errors():
+ return 1
+ return 0
diff --git a/scripts/validate_all.py b/scripts/validate_all.py
new file mode 100755
index 00000000..d864731a
--- /dev/null
+++ b/scripts/validate_all.py
@@ -0,0 +1,218 @@
+#!/usr/bin/env python3
+"""
+Unified Code Validator
+======================
+Runs all validation scripts (architecture, security, performance) in sequence.
+
+This provides a single entry point for comprehensive code validation,
+useful for CI/CD pipelines and pre-commit hooks.
+
+Usage:
+ python scripts/validate_all.py # Run all validators
+ python scripts/validate_all.py --security # Run only security validator
+ python scripts/validate_all.py --performance # Run only performance validator
+ python scripts/validate_all.py --architecture # Run only architecture validator
+ python scripts/validate_all.py -v # Verbose output
+ python scripts/validate_all.py --fail-fast # Stop on first failure
+ python scripts/validate_all.py --json # JSON output
+
+Options:
+ --architecture Run architecture validator
+ --security Run security validator
+ --performance Run performance validator
+ --fail-fast Stop on first validator failure
+ -v, --verbose Show detailed output
+ --errors-only Only show errors
+ --json Output results as JSON
+"""
+
+import argparse
+import json
+import sys
+from pathlib import Path
+
+# Add parent directory to path for imports
+sys.path.insert(0, str(Path(__file__).parent))
+
+from base_validator import Severity
+
+
+def run_architecture_validator(verbose: bool = False) -> tuple[int, dict]:
+ """Run the architecture validator"""
+ try:
+ # Import dynamically to avoid circular imports
+ sys.path.insert(0, str(Path(__file__).parent.parent))
+ from scripts.validate_architecture import ArchitectureValidator
+
+ config_path = Path.cwd() / ".architecture-rules.yaml"
+ validator = ArchitectureValidator(config_path=config_path, verbose=verbose)
+ result = validator.validate_all()
+
+ return (
+ 1 if result.has_errors() else 0,
+ {
+ "name": "Architecture",
+ "files_checked": result.files_checked,
+ "errors": sum(1 for v in result.violations if v.severity.value == "error"),
+ "warnings": sum(1 for v in result.violations if v.severity.value == "warning"),
+ "info": sum(1 for v in result.violations if v.severity.value == "info"),
+ }
+ )
+ except ImportError as e:
+ print(f"⚠️ Architecture validator not available: {e}")
+ return 0, {"name": "Architecture", "skipped": True}
+ except Exception as e:
+ print(f"❌ Architecture validator failed: {e}")
+ return 1, {"name": "Architecture", "error": str(e)}
+
+
+def run_security_validator(verbose: bool = False) -> tuple[int, dict]:
+ """Run the security validator"""
+ try:
+ from validate_security import SecurityValidator
+
+ validator = SecurityValidator(verbose=verbose)
+ result = validator.validate_all()
+
+ return (
+ 1 if result.has_errors() else 0,
+ {
+ "name": "Security",
+ "files_checked": result.files_checked,
+ "errors": result.error_count(),
+ "warnings": result.warning_count(),
+ "info": result.info_count(),
+ }
+ )
+ except ImportError as e:
+ print(f"⚠️ Security validator not available: {e}")
+ return 0, {"name": "Security", "skipped": True}
+ except Exception as e:
+ print(f"❌ Security validator failed: {e}")
+ return 1, {"name": "Security", "error": str(e)}
+
+
+def run_performance_validator(verbose: bool = False) -> tuple[int, dict]:
+ """Run the performance validator"""
+ try:
+ from validate_performance import PerformanceValidator
+
+ validator = PerformanceValidator(verbose=verbose)
+ result = validator.validate_all()
+
+ return (
+ 1 if result.has_errors() else 0,
+ {
+ "name": "Performance",
+ "files_checked": result.files_checked,
+ "errors": result.error_count(),
+ "warnings": result.warning_count(),
+ "info": result.info_count(),
+ }
+ )
+ except ImportError as e:
+ print(f"⚠️ Performance validator not available: {e}")
+ return 0, {"name": "Performance", "skipped": True}
+ except Exception as e:
+ print(f"❌ Performance validator failed: {e}")
+ return 1, {"name": "Performance", "error": str(e)}
+
+
+def print_summary(results: list[dict], json_output: bool = False):
+ """Print validation summary"""
+ if json_output:
+ print(json.dumps({"validators": results}, indent=2))
+ return
+
+ print("\n" + "=" * 80)
+ print("📊 UNIFIED VALIDATION SUMMARY")
+ print("=" * 80)
+
+ total_errors = 0
+ total_warnings = 0
+ total_info = 0
+
+ for result in results:
+ if result.get("skipped"):
+ print(f"\n⏭️ {result['name']}: Skipped")
+ elif result.get("error"):
+ print(f"\n❌ {result['name']}: Error - {result['error']}")
+ else:
+ errors = result.get("errors", 0)
+ warnings = result.get("warnings", 0)
+ info = result.get("info", 0)
+ total_errors += errors
+ total_warnings += warnings
+ total_info += info
+
+ status = "✅" if errors == 0 else "❌"
+ print(f"\n{status} {result['name']}:")
+ print(f" Files: {result.get('files_checked', 0)}")
+ print(f" Errors: {errors}, Warnings: {warnings}, Info: {info}")
+
+ print("\n" + "-" * 80)
+ print(f"TOTAL: {total_errors} errors, {total_warnings} warnings, {total_info} info")
+ print("=" * 80)
+
+ if total_errors > 0:
+ print("❌ VALIDATION FAILED")
+ elif total_warnings > 0:
+ print(f"⚠️ VALIDATION PASSED WITH {total_warnings} WARNING(S)")
+ else:
+ print("✅ VALIDATION PASSED")
+ print("=" * 80)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Unified code validator - runs architecture, security, and performance checks",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument("--architecture", action="store_true", help="Run architecture validator")
+ parser.add_argument("--security", action="store_true", help="Run security validator")
+ parser.add_argument("--performance", action="store_true", help="Run performance validator")
+ parser.add_argument("--fail-fast", action="store_true", help="Stop on first failure")
+ parser.add_argument("-v", "--verbose", action="store_true", help="Verbose output")
+ parser.add_argument("--errors-only", action="store_true", help="Only show errors")
+ parser.add_argument("--json", action="store_true", help="JSON output")
+
+ args = parser.parse_args()
+
+ # If no specific validators selected, run all
+ run_all = not (args.architecture or args.security or args.performance)
+
+ print("\n🔍 UNIFIED CODE VALIDATION")
+ print("=" * 80)
+
+ validators = []
+ if run_all or args.architecture:
+ validators.append(("Architecture", run_architecture_validator))
+ if run_all or args.security:
+ validators.append(("Security", run_security_validator))
+ if run_all or args.performance:
+ validators.append(("Performance", run_performance_validator))
+
+ results = []
+ exit_code = 0
+
+ for name, validator_func in validators:
+ print(f"\n{'=' * 40}")
+ print(f"🔍 Running {name} Validator...")
+ print("=" * 40)
+
+ code, result = validator_func(verbose=args.verbose)
+
+ results.append(result)
+
+ if code != 0:
+ exit_code = 1
+ if args.fail_fast:
+ print(f"\n❌ {name} validator failed. Stopping (--fail-fast)")
+ break
+
+ print_summary(results, json_output=args.json)
+ sys.exit(exit_code)
+
+
+if __name__ == "__main__":
+ main()
diff --git a/scripts/validate_performance.py b/scripts/validate_performance.py
new file mode 100755
index 00000000..d7da7603
--- /dev/null
+++ b/scripts/validate_performance.py
@@ -0,0 +1,648 @@
+#!/usr/bin/env python3
+"""
+Performance Validator
+=====================
+Validates code against performance rules defined in .performance-rules/
+
+This script checks for common performance issues:
+- N+1 query patterns
+- Missing pagination
+- Inefficient database operations
+- Memory management issues
+- Frontend performance anti-patterns
+- Missing timeouts and connection pooling
+
+Usage:
+ python scripts/validate_performance.py # Check all files
+ python scripts/validate_performance.py -d app/services/ # Check specific directory
+ python scripts/validate_performance.py -f app/api/v1/products.py # Check single file
+ python scripts/validate_performance.py -v # Verbose output
+ python scripts/validate_performance.py --json # JSON output
+ python scripts/validate_performance.py --errors-only # Only show errors
+
+Options:
+ -f, --file PATH Validate a single file
+ -d, --folder PATH Validate all files in a directory (recursive)
+ -v, --verbose Show detailed output including context
+ --errors-only Only show errors, suppress warnings and info
+ --json Output results as JSON
+"""
+
+import argparse
+import re
+import sys
+from pathlib import Path
+
+# Add parent directory to path for imports
+sys.path.insert(0, str(Path(__file__).parent))
+
+from base_validator import BaseValidator, Severity, ValidationResult
+
+
+class PerformanceValidator(BaseValidator):
+ """Performance-focused code validator"""
+
+ VALIDATOR_NAME = "Performance Validator"
+ VALIDATOR_EMOJI = "⚡"
+ RULES_DIR_NAME = ".performance-rules"
+ CONFIG_FILE_NAME = ".performance-rules.yaml"
+
+ def validate_all(self, target_path: Path = None) -> ValidationResult:
+ """Validate all files for performance issues"""
+ print(f"\n{self.VALIDATOR_EMOJI} Starting performance validation...\n")
+
+ target = target_path or self.project_root
+
+ # Validate Python files
+ self._validate_python_files(target)
+
+ # Validate JavaScript files
+ self._validate_javascript_files(target)
+
+ # Validate HTML templates
+ self._validate_template_files(target)
+
+ return self.result
+
+ def _validate_python_files(self, target: Path):
+ """Validate all Python files for performance issues"""
+ print("🐍 Validating Python files...")
+
+ for py_file in target.rglob("*.py"):
+ if self._should_ignore_file(py_file):
+ continue
+
+ self.result.files_checked += 1
+ content = py_file.read_text()
+ lines = content.split("\n")
+ self._validate_python_performance(py_file, content, lines)
+
+ def _validate_javascript_files(self, target: Path):
+ """Validate all JavaScript files for performance issues"""
+ print("🟨 Validating JavaScript files...")
+
+ for js_file in target.rglob("*.js"):
+ if self._should_ignore_file(js_file):
+ continue
+
+ self.result.files_checked += 1
+ content = js_file.read_text()
+ lines = content.split("\n")
+ self._validate_javascript_performance(js_file, content, lines)
+
+ def _validate_template_files(self, target: Path):
+ """Validate all HTML template files for performance issues"""
+ print("📄 Validating template files...")
+
+ for html_file in target.rglob("*.html"):
+ if self._should_ignore_file(html_file):
+ continue
+
+ self.result.files_checked += 1
+ content = html_file.read_text()
+ lines = content.split("\n")
+ self._validate_template_performance(html_file, content, lines)
+
+ def _validate_file_content(self, file_path: Path, content: str, lines: list[str]):
+ """Validate file content based on file type"""
+ if file_path.suffix == ".py":
+ self._validate_python_performance(file_path, content, lines)
+ elif file_path.suffix == ".js":
+ self._validate_javascript_performance(file_path, content, lines)
+ elif file_path.suffix == ".html":
+ self._validate_template_performance(file_path, content, lines)
+
+ def _validate_python_performance(self, file_path: Path, content: str, lines: list[str]):
+ """Validate Python file for performance issues"""
+ file_path_str = str(file_path)
+
+ # PERF-001: N+1 query detection
+ self._check_n_plus_1_queries(file_path, content, lines)
+
+ # PERF-003: Query result limiting
+ self._check_query_limiting(file_path, content, lines)
+
+ # PERF-006: Bulk operations
+ self._check_bulk_operations(file_path, content, lines)
+
+ # PERF-008: Use EXISTS for existence checks
+ self._check_existence_checks(file_path, content, lines)
+
+ # PERF-009: Batch updates
+ self._check_batch_updates(file_path, content, lines)
+
+ # PERF-026: Pagination for API endpoints
+ if "/api/" in file_path_str:
+ self._check_api_pagination(file_path, content, lines)
+
+ # PERF-037: Parallel async operations
+ self._check_parallel_async(file_path, content, lines)
+
+ # PERF-040: Timeout configuration
+ self._check_timeout_config(file_path, content, lines)
+
+ # PERF-046: Generators for large datasets
+ self._check_generators(file_path, content, lines)
+
+ # PERF-047: Stream file uploads
+ if "upload" in file_path_str.lower() or "file" in file_path_str.lower():
+ self._check_file_streaming(file_path, content, lines)
+
+ # PERF-048: Chunked processing
+ if "import" in file_path_str.lower() or "csv" in file_path_str.lower():
+ self._check_chunked_processing(file_path, content, lines)
+
+ # PERF-049: Context managers for files
+ self._check_context_managers(file_path, content, lines)
+
+ # PERF-051: String concatenation
+ self._check_string_concatenation(file_path, content, lines)
+
+ def _validate_javascript_performance(self, file_path: Path, content: str, lines: list[str]):
+ """Validate JavaScript file for performance issues"""
+ # PERF-056: Debounce search inputs
+ self._check_debounce(file_path, content, lines)
+
+ # PERF-062: Polling intervals
+ self._check_polling_intervals(file_path, content, lines)
+
+ # PERF-064: Layout thrashing
+ self._check_layout_thrashing(file_path, content, lines)
+
+ def _validate_template_performance(self, file_path: Path, content: str, lines: list[str]):
+ """Validate HTML template file for performance issues"""
+ # PERF-058: Image lazy loading
+ self._check_image_lazy_loading(file_path, content, lines)
+
+ # PERF-067: Script defer/async
+ self._check_script_loading(file_path, content, lines)
+
+ # =========================================================================
+ # Database Performance Checks
+ # =========================================================================
+
+ def _check_n_plus_1_queries(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-001: Check for N+1 query patterns"""
+ # Look for patterns like: for item in items: item.relationship.attribute
+ in_for_loop = False
+ for_line_num = 0
+
+ for i, line in enumerate(lines, 1):
+ stripped = line.strip()
+
+ # Track for loops over query results
+ if re.search(r'for\s+\w+\s+in\s+.*\.(all|query)', line):
+ in_for_loop = True
+ for_line_num = i
+ elif in_for_loop and stripped and not stripped.startswith("#"):
+ # Check for relationship access in loop
+ if re.search(r'\.\w+\.\w+', line) and "(" not in line:
+ # Could be accessing a relationship
+ if any(rel in line for rel in [".customer.", ".vendor.", ".order.", ".product.", ".user."]):
+ self._add_violation(
+ rule_id="PERF-001",
+ rule_name="N+1 query detection",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="Possible N+1 query - relationship accessed in loop",
+ context=line.strip()[:80],
+ suggestion="Use joinedload() or selectinload() for eager loading",
+ )
+ in_for_loop = False
+
+ # Reset on dedent
+ if in_for_loop and line and not line.startswith(" " * 4) and i > for_line_num + 1:
+ in_for_loop = False
+
+ def _check_query_limiting(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-003: Check for unbounded query results"""
+ for i, line in enumerate(lines, 1):
+ if re.search(r'\.all\(\)', line):
+ # Check if there's a limit or filter before
+ context_start = max(0, i - 5)
+ context_lines = lines[context_start:i]
+ context_text = "\n".join(context_lines)
+
+ if "limit" not in context_text.lower() and "filter" not in context_text.lower():
+ if "# noqa" in line or "# bounded" in line:
+ continue
+ self._add_violation(
+ rule_id="PERF-003",
+ rule_name="Query result limiting",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message="Query may return unbounded results",
+ context=line.strip()[:80],
+ suggestion="Add .limit() or pagination for large tables",
+ )
+
+ def _check_bulk_operations(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-006: Check for individual operations in loops"""
+ in_for_loop = False
+ for_indent = 0
+
+ for i, line in enumerate(lines, 1):
+ stripped = line.strip()
+
+ # Track for loops
+ if re.search(r'for\s+\w+\s+in\s+', line):
+ in_for_loop = True
+ for_indent = len(line) - len(line.lstrip())
+ elif in_for_loop:
+ current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
+
+ if current_indent <= for_indent and stripped:
+ in_for_loop = False
+ elif "db.add(" in line or ".save(" in line:
+ self._add_violation(
+ rule_id="PERF-006",
+ rule_name="Bulk operations for multiple records",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="Individual db.add() in loop - consider bulk operations",
+ context=line.strip()[:80],
+ suggestion="Use db.add_all() or bulk_insert_mappings()",
+ )
+
+ def _check_existence_checks(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-008: Check for inefficient existence checks"""
+ patterns = [
+ (r'\.count\(\)\s*>\s*0', "count() > 0"),
+ (r'\.count\(\)\s*>=\s*1', "count() >= 1"),
+ (r'\.count\(\)\s*!=\s*0', "count() != 0"),
+ ]
+
+ for i, line in enumerate(lines, 1):
+ for pattern, issue in patterns:
+ if re.search(pattern, line):
+ self._add_violation(
+ rule_id="PERF-008",
+ rule_name="Use EXISTS for existence checks",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message=f"{issue} scans all rows - use EXISTS instead",
+ context=line.strip()[:80],
+ suggestion="Use db.scalar(exists().where(...)) or .first() is not None",
+ )
+
+ def _check_batch_updates(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-009: Check for updates in loops"""
+ in_for_loop = False
+ for_indent = 0
+ loop_var = ""
+
+ for i, line in enumerate(lines, 1):
+ stripped = line.strip()
+
+ # Track for loops
+ match = re.search(r'for\s+(\w+)\s+in\s+', line)
+ if match:
+ in_for_loop = True
+ for_indent = len(line) - len(line.lstrip())
+ loop_var = match.group(1)
+ elif in_for_loop:
+ current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
+
+ if current_indent <= for_indent and stripped:
+ in_for_loop = False
+ elif loop_var and f"{loop_var}." in line and "=" in line and "==" not in line:
+ # Attribute assignment in loop
+ if "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-009",
+ rule_name="Batch updates instead of loops",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message="Individual updates in loop - consider batch update",
+ context=line.strip()[:80],
+ suggestion="Use .update({...}) with filters for batch updates",
+ )
+
+ # =========================================================================
+ # API Performance Checks
+ # =========================================================================
+
+ def _check_api_pagination(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-026: Check for missing pagination in list endpoints"""
+ # Look for GET endpoints that return lists
+ in_endpoint = False
+ endpoint_line = 0
+ has_pagination = False
+
+ for i, line in enumerate(lines, 1):
+ # Track router decorators
+ if re.search(r'@router\.(get|post)', line):
+ in_endpoint = True
+ endpoint_line = i
+ has_pagination = False
+ elif in_endpoint:
+ # Check for pagination parameters
+ if re.search(r'(skip|offset|page|limit)', line):
+ has_pagination = True
+ # Check for function end
+ if re.search(r'^def\s+\w+', line.lstrip()) and i > endpoint_line + 1:
+ in_endpoint = False
+ # Check for .all() without pagination
+ if ".all()" in line and not has_pagination:
+ if "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-026",
+ rule_name="Pagination required for list endpoints",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="List endpoint may lack pagination",
+ context=line.strip()[:80],
+ suggestion="Add skip/limit parameters for pagination",
+ )
+
+ # =========================================================================
+ # Async Performance Checks
+ # =========================================================================
+
+ def _check_parallel_async(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-037: Check for sequential awaits that could be parallel"""
+ await_count = 0
+ await_lines = []
+
+ for i, line in enumerate(lines, 1):
+ stripped = line.strip()
+
+ if stripped.startswith("await "):
+ await_count += 1
+ await_lines.append(i)
+
+ # Check for 3+ sequential awaits
+ if await_count >= 3:
+ # Verify they're sequential (within 5 lines of each other)
+ if all(await_lines[j+1] - await_lines[j] <= 2 for j in range(len(await_lines)-1)):
+ self._add_violation(
+ rule_id="PERF-037",
+ rule_name="Parallel independent operations",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=await_lines[0],
+ message=f"{await_count} sequential awaits - consider asyncio.gather()",
+ context="Multiple await statements",
+ suggestion="Use asyncio.gather() for independent async operations",
+ )
+ await_count = 0
+ await_lines = []
+ elif stripped and not stripped.startswith("#"):
+ # Reset on non-await, non-empty line
+ if await_count > 0:
+ await_count = 0
+ await_lines = []
+
+ def _check_timeout_config(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-040: Check for missing timeouts on HTTP clients"""
+ if "requests" not in content and "httpx" not in content and "aiohttp" not in content:
+ return
+
+ patterns = [
+ r'requests\.(get|post|put|delete|patch)\s*\([^)]+\)',
+ r'httpx\.(get|post|put|delete|patch)\s*\([^)]+\)',
+ ]
+
+ for i, line in enumerate(lines, 1):
+ for pattern in patterns:
+ if re.search(pattern, line) and "timeout" not in line:
+ self._add_violation(
+ rule_id="PERF-040",
+ rule_name="Timeout configuration",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="HTTP request without timeout",
+ context=line.strip()[:80],
+ suggestion="Add timeout parameter to prevent hanging requests",
+ )
+
+ # =========================================================================
+ # Memory Performance Checks
+ # =========================================================================
+
+ def _check_generators(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-046: Check for loading large datasets into memory"""
+ for i, line in enumerate(lines, 1):
+ # Check for .all() followed by iteration
+ if ".all()" in line:
+ # Look ahead for iteration
+ if i < len(lines):
+ next_lines = "\n".join(lines[i:min(i+3, len(lines))])
+ if "for " in next_lines and "in" in next_lines:
+ if "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-046",
+ rule_name="Generators for large datasets",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message=".all() loads everything into memory before iteration",
+ context=line.strip()[:80],
+ suggestion="Use .yield_per(100) for large result sets",
+ )
+
+ def _check_file_streaming(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-047: Check for loading entire files into memory"""
+ for i, line in enumerate(lines, 1):
+ if re.search(r'await\s+\w+\.read\(\)', line) and "chunk" not in line:
+ self._add_violation(
+ rule_id="PERF-047",
+ rule_name="Stream large file uploads",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message="Full file read into memory",
+ context=line.strip()[:80],
+ suggestion="Stream large files: while chunk := await file.read(8192)",
+ )
+
+ def _check_chunked_processing(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-048: Check for chunked processing in imports"""
+ if "chunk" not in content.lower() and "batch" not in content.lower():
+ # Check if file processes multiple records
+ if "for " in content and ("csv" in content.lower() or "import" in content.lower()):
+ self._add_violation(
+ rule_id="PERF-048",
+ rule_name="Chunked processing for imports",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=1,
+ message="Import processing may benefit from chunking",
+ context="File processes multiple records",
+ suggestion="Process in chunks with periodic commits",
+ )
+
+ def _check_context_managers(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-049: Check for file handles without context managers"""
+ for i, line in enumerate(lines, 1):
+ # Check for file open without 'with'
+ if re.search(r'^\s*\w+\s*=\s*open\s*\(', line):
+ if "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-049",
+ rule_name="Context managers for resources",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="File opened without context manager",
+ context=line.strip()[:80],
+ suggestion="Use 'with open(...) as f:' to ensure cleanup",
+ )
+
+ def _check_string_concatenation(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-051: Check for inefficient string concatenation in loops"""
+ in_for_loop = False
+ for_indent = 0
+
+ for i, line in enumerate(lines, 1):
+ stripped = line.strip()
+
+ if re.search(r'for\s+\w+\s+in\s+', line):
+ in_for_loop = True
+ for_indent = len(line) - len(line.lstrip())
+ elif in_for_loop:
+ current_indent = len(line) - len(line.lstrip()) if line.strip() else for_indent + 4
+
+ if current_indent <= for_indent and stripped:
+ in_for_loop = False
+ elif re.search(r'\w+\s*\+=\s*["\']|str\s*\(', line):
+ if "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-051",
+ rule_name="String concatenation efficiency",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message="String concatenation in loop",
+ context=line.strip()[:80],
+ suggestion="Use ''.join() or StringIO for many concatenations",
+ )
+
+ # =========================================================================
+ # Frontend Performance Checks
+ # =========================================================================
+
+ def _check_debounce(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-056: Check for search inputs without debounce"""
+ for i, line in enumerate(lines, 1):
+ if re.search(r'@(input|keyup)=".*search.*fetch', line, re.IGNORECASE):
+ if "debounce" not in content.lower():
+ self._add_violation(
+ rule_id="PERF-056",
+ rule_name="Debounce search inputs",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message="Search input triggers API call without debounce",
+ context=line.strip()[:80],
+ suggestion="Add 300-500ms debounce to prevent excessive API calls",
+ )
+
+ def _check_polling_intervals(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-062: Check for too-frequent polling"""
+ for i, line in enumerate(lines, 1):
+ match = re.search(r'setInterval\s*\([^,]+,\s*(\d+)\s*\)', line)
+ if match:
+ interval = int(match.group(1))
+ if interval < 10000: # Less than 10 seconds
+ if "# real-time" not in line and "# noqa" not in line:
+ self._add_violation(
+ rule_id="PERF-062",
+ rule_name="Reasonable polling intervals",
+ severity=Severity.WARNING,
+ file_path=file_path,
+ line_number=i,
+ message=f"Polling interval {interval}ms is very frequent",
+ context=line.strip()[:80],
+ suggestion="Use >= 10 second intervals for non-critical updates",
+ )
+
+ def _check_layout_thrashing(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-064: Check for layout thrashing patterns"""
+ for i, line in enumerate(lines, 1):
+ # Check for read then write patterns
+ if re.search(r'(offsetHeight|offsetWidth|clientHeight|clientWidth)', line):
+ if i < len(lines):
+ next_line = lines[i] if i < len(lines) else ""
+ if "style" in next_line:
+ self._add_violation(
+ rule_id="PERF-064",
+ rule_name="Avoid layout thrashing",
+ severity=Severity.INFO,
+ file_path=file_path,
+ line_number=i,
+ message="DOM read followed by write can cause layout thrashing",
+ context=line.strip()[:80],
+ suggestion="Batch DOM reads, then batch DOM writes",
+ )
+
+ def _check_image_lazy_loading(self, file_path: Path, content: str, lines: list[str]):
+ """PERF-058: Check for images without lazy loading"""
+ for i, line in enumerate(lines, 1):
+ if re.search(r'