Files
orion/docs/api/rate-limiting.md

9.5 KiB

Rate Limiting

API rate limiting implementation using sliding window algorithm for request throttling and abuse prevention.

Overview

The platform uses an in-memory rate limiter with a sliding window algorithm to protect endpoints from abuse and ensure fair resource usage across all clients.

Features

  • Sliding Window Algorithm: Accurate rate limiting based on request timestamps
  • Per-Client Tracking: Individual limits for each client
  • Automatic Cleanup: Removes old entries to prevent memory leaks
  • Configurable Limits: Set custom limits per endpoint
  • Decorator-Based: Easy integration with FastAPI routes

How It Works

Sliding Window Algorithm

The rate limiter uses a sliding window approach:

  1. Records timestamp of each request for a client
  2. When new request comes in, removes expired timestamps
  3. Counts remaining requests in the current window
  4. Allows or denies based on the limit
graph LR
    A[New Request] --> B{Check Window}
    B --> C[Remove Old Timestamps]
    C --> D{Count < Limit?}
    D -->|Yes| E[Allow Request]
    D -->|No| F[Reject - 429]
    E --> G[Record Timestamp]

Example Timeline

For a limit of 10 requests per 60 seconds:

Time:  0s    10s   20s   30s   40s   50s   60s   70s
       |-----|-----|-----|-----|-----|-----|-----|
Req:   1 2 3 4 5 6 7 8 9 10 <-- Window (60s) --> 11
                                                  ^
                                              ALLOWED
                                        (req 1-3 expired)

Configuration

Global Rate Limiter

A global rate limiter instance is available:

from middleware.rate_limiter import RateLimiter

rate_limiter = RateLimiter()

Rate Limiter Options

Parameter Type Default Description
cleanup_interval int 3600 Seconds between automatic cleanups

Usage

Using the Decorator

The easiest way to add rate limiting to an endpoint:

from middleware.decorators import rate_limit

@app.post("/api/v1/resource")
@rate_limit(max_requests=10, window_seconds=60)
async def create_resource():
    return {"status": "created"}

Decorator Parameters

Parameter Type Default Description
max_requests int 100 Maximum requests allowed
window_seconds int 3600 Time window in seconds (1 hour)

Manual Usage

For more control, use the rate limiter directly:

from middleware.rate_limiter import RateLimiter
from app.exceptions import RateLimitException

rate_limiter = RateLimiter()

@app.post("/api/v1/custom")
async def custom_endpoint(request: Request):
    client_id = request.client.host

    if not rate_limiter.allow_request(client_id, max_requests=5, window_seconds=60):
        raise RateLimitException(
            message="Too many requests",
            retry_after=60
        )

    return {"status": "success"}

Client Identification

Current Implementation

By default, the rate limiter uses a simple client ID:

client_id = "anonymous"  # Basic implementation

Production Recommendations

For production, implement proper client identification:

Option 1: By IP Address

client_id = request.client.host

Option 2: By API Key

api_key = request.headers.get("X-API-Key", "anonymous")
client_id = f"apikey:{api_key}"

Option 3: By Authenticated User

if hasattr(request.state, 'user'):
    client_id = f"user:{request.state.user.id}"
else:
    client_id = f"ip:{request.client.host}"

Option 4: Combined Approach

def get_client_id(request: Request) -> str:
    # Prefer authenticated user
    if hasattr(request.state, 'user'):
        return f"user:{request.state.user.id}"

    # Fall back to API key
    api_key = request.headers.get("X-API-Key")
    if api_key:
        return f"key:{api_key}"

    # Last resort: IP address
    return f"ip:{request.client.host}"

Common Rate Limit Configurations

Conservative Limits

For expensive operations or authenticated endpoints:

@rate_limit(max_requests=10, window_seconds=3600)  # 10 per hour
async def expensive_operation():
    pass

Moderate Limits

For standard API operations:

@rate_limit(max_requests=100, window_seconds=3600)  # 100 per hour
async def standard_operation():
    pass

Generous Limits

For read-heavy operations:

@rate_limit(max_requests=1000, window_seconds=3600)  # 1000 per hour
async def read_operation():
    pass

Per-Minute Limits

For real-time operations:

@rate_limit(max_requests=60, window_seconds=60)  # 60 per minute
async def realtime_operation():
    pass

Error Response

When rate limit is exceeded, clients receive a 429 status code:

{
  "detail": "Rate limit exceeded",
  "status_code": 429,
  "retry_after": 3600,
  "timestamp": "2024-11-16T13:00:00Z",
  "path": "/api/v1/resource"
}

Response Headers

Consider adding rate limit headers (future enhancement):

X-RateLimit-Limit: 100
X-RateLimit-Remaining: 45
X-RateLimit-Reset: 1700145600

Memory Management

Automatic Cleanup

The rate limiter automatically cleans up old entries:

# Runs every hour by default
cleanup_interval = 3600  # seconds

Manual Cleanup

Force cleanup if needed:

rate_limiter.cleanup_old_entries()

Memory Considerations

For high-traffic applications:

  • Each client uses approximately 8 bytes per request timestamp
  • Example: 1000 clients x 100 requests = approximately 800 KB
  • Consider Redis for distributed rate limiting

Advanced Patterns

Different Limits by Role

from fastapi import Depends
from models.database.user import User

def get_rate_limit_for_user(user: User) -> tuple[int, int]:
    limits = {
        "admin": (10000, 3600),  # 10k per hour
        "vendor": (1000, 3600),   # 1k per hour
        "customer": (100, 3600),  # 100 per hour
    }
    return limits.get(user.role, (100, 3600))

@app.post("/api/v1/resource")
async def resource_endpoint(
    current_user: User = Depends(get_current_user)
):
    max_req, window = get_rate_limit_for_user(current_user)
    client_id = f"user:{current_user.id}"

    if not rate_limiter.allow_request(client_id, max_req, window):
        raise RateLimitException(retry_after=window)

    return {"status": "success"}

Endpoint-Specific Limits

RATE_LIMITS = {
    "/api/v1/auth/login": (5, 300),      # 5 per 5 minutes
    "/api/v1/products": (100, 3600),      # 100 per hour
    "/api/v1/orders": (50, 3600),         # 50 per hour
}

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    if request.url.path in RATE_LIMITS:
        max_req, window = RATE_LIMITS[request.url.path]
        client_id = request.client.host

        if not rate_limiter.allow_request(client_id, max_req, window):
            raise RateLimitException(retry_after=window)

    return await call_next(request)

Testing Rate Limits

Unit Tests

import pytest
from middleware.rate_limiter import RateLimiter

def test_rate_limiter_allows_requests_within_limit():
    limiter = RateLimiter()
    client = "test_client"

    # Should allow first 5 requests
    for i in range(5):
        assert limiter.allow_request(client, max_requests=5, window_seconds=60)

    # Should deny 6th request
    assert not limiter.allow_request(client, max_requests=5, window_seconds=60)

Integration Tests

def test_rate_limit_endpoint(client):
    # Make requests up to limit
    for i in range(10):
        response = client.post("/api/v1/resource")
        assert response.status_code == 200

    # Next request should be rate limited
    response = client.post("/api/v1/resource")
    assert response.status_code == 429
    assert "retry_after" in response.json()

Production Considerations

Distributed Rate Limiting

For multi-server deployments, use Redis:

import redis
from datetime import datetime, timezone

class RedisRateLimiter:
    def __init__(self):
        self.redis = redis.Redis(host='localhost', port=6379, db=0)

    def allow_request(self, client_id: str, max_requests: int, window: int) -> bool:
        key = f"ratelimit:{client_id}"
        now = datetime.now(timezone.utc).timestamp()

        # Remove old entries
        self.redis.zremrangebyscore(key, 0, now - window)

        # Count requests in window
        count = self.redis.zcard(key)

        if count < max_requests:
            # Add new request
            self.redis.zadd(key, {now: now})
            self.redis.expire(key, window)
            return True

        return False

Monitoring

Log rate limit violations for monitoring:

@app.middleware("http")
async def rate_limit_monitoring(request: Request, call_next):
    try:
        response = await call_next(request)
        return response
    except RateLimitException as e:
        logger.warning(
            f"Rate limit exceeded",
            extra={
                "client": request.client.host,
                "path": request.url.path,
                "user_agent": request.headers.get("user-agent")
            }
        )
        raise

API Reference

For detailed implementation, see the RateLimiter class in middleware/rate_limiter.py.