Rate Limiting API Reference¶
Complete reference for ProxyWhirl’s rate limiting system with per-proxy and global rate limits using token bucket algorithm.
from proxywhirl.rate_limiting import (
RateLimiter, # Legacy (sync, thread-safe with RLock)
AsyncRateLimiter, # For async contexts (asyncio.Lock)
SyncRateLimiter, # For sync contexts (threading.RLock)
RateLimit,
RateLimitEvent,
)
Overview¶
The rate limiting subsystem provides flexible request throttling for proxies with:
Per-proxy limits: Individual rate limits for each proxy
Global limits: Aggregate rate limit across all proxies
Token bucket algorithm: Burst capacity with sustained rate limiting
Three implementations:
SyncRateLimiter(sync),AsyncRateLimiter(async),RateLimiter(legacy)Thread-safe: All implementations use appropriate locking
Built on top of pyrate-limiter for robust, production-ready rate limiting.
Important
There are three rate limiter classes. Use SyncRateLimiter with ProxyWhirl and AsyncRateLimiter with AsyncProxyWhirl. The base RateLimiter class is deprecated but maintained for backwards compatibility.
Core Classes¶
RateLimiter (Legacy)¶
Deprecated since version 0.1.1: Use SyncRateLimiter for synchronous contexts or AsyncRateLimiter for async contexts. This class is maintained for backwards compatibility.
Synchronous rate limiter with per-proxy and global limits using token bucket algorithm. Uses threading.RLock for thread safety.
from proxywhirl.rate_limiting import RateLimiter, RateLimit
limiter = RateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
# Synchronous check (NOT async)
if limiter.check_limit("proxy1"):
response = make_request_with_proxy("proxy1")
SyncRateLimiter¶
Synchronous rate limiter for use with ProxyWhirl. Thread-safe via threading.RLock.
from proxywhirl.rate_limiting import SyncRateLimiter, RateLimit
limiter = SyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
# Synchronous check
if limiter.check_limit("proxy1"):
response = make_request_with_proxy("proxy1")
AsyncRateLimiter¶
Async rate limiter for use with AsyncProxyWhirl. Uses asyncio.Lock for async safety.
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
# Async set_proxy_limit
await limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
# Async check
if await limiter.check_limit("proxy1"):
response = await make_request_with_proxy("proxy1")
Constructor (all classes)¶
All three rate limiter classes share the same constructor signature:
Parameters:
Parameter |
Type |
Default |
Description |
|---|---|---|---|
|
|
|
Global rate limit across all proxies |
Example:
from proxywhirl.rate_limiting import SyncRateLimiter, AsyncRateLimiter, RateLimit
# No limits
limiter = SyncRateLimiter()
# Global limit only
limiter = SyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
# Per-proxy limits added later
limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
Methods¶
Note
SyncRateLimiter and RateLimiter methods are synchronous. AsyncRateLimiter methods (set_proxy_limit, check_limit, acquire) are all async.
set_proxy_limit(proxy_id: str, limit: RateLimit) -> None¶
Set rate limit for a specific proxy.
Behavior:
Creates a new token bucket limiter for the proxy
Overwrites existing limit if proxy_id already has one
Thread-safe with lock protection
Logs limit configuration at INFO level
Parameters:
proxy_id(str): Unique proxy identifierlimit(RateLimit): Rate limit configuration
Example:
from proxywhirl.rate_limiting import SyncRateLimiter, RateLimit
limiter = SyncRateLimiter()
# Set different limits for different proxies
limiter.set_proxy_limit(
"premium_proxy",
RateLimit(max_requests=100, time_window=1) # 100 req/sec
)
limiter.set_proxy_limit(
"free_proxy",
RateLimit(max_requests=5, time_window=1) # 5 req/sec
)
# Update existing limit
limiter.set_proxy_limit(
"premium_proxy",
RateLimit(max_requests=200, time_window=1) # Increase to 200 req/sec
)
check_limit(proxy_id: str) -> bool¶
Check if request is allowed for proxy. Async in AsyncRateLimiter, sync in SyncRateLimiter/RateLimiter.
Flow:
Check per-proxy limit (if set)
Check global limit (if set)
Return True if both allow, False if either denies
Behavior:
Consumes a token from the bucket if allowed
Logs warning at WARNING level if rate limit exceeded
Thread-safe with lock protection for limiter access
Non-blocking (returns immediately)
Parameters:
proxy_id(str): Unique proxy identifier
Returns:
Trueif request is allowedFalseif rate limit exceeded (either per-proxy or global)
Logs:
WARNING: “Rate limit exceeded for proxy {proxy_id}” (per-proxy limit)
WARNING: “Global rate limit exceeded” (global limit)
Example (sync):
from proxywhirl.rate_limiting import SyncRateLimiter, RateLimit
limiter = SyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
# Check before making request
if limiter.check_limit("proxy1"):
print("Request allowed")
else:
print("Rate limited, try again later")
Example (async):
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
await limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
if await limiter.check_limit("proxy1"):
response = await make_request("proxy1")
acquire(proxy_id: str) -> bool¶
Acquire permission to make a request (alias for check_limit). Async in AsyncRateLimiter, sync in SyncRateLimiter/RateLimiter.
Parameters:
proxy_id(str): Unique proxy identifier
Returns:
Trueif request is allowedFalseif rate limit exceeded
Example:
# Sync
if limiter.acquire("proxy1"):
response = make_request("proxy1")
# Async
if await async_limiter.acquire("proxy1"):
response = await make_request("proxy1")
Data Models¶
RateLimit¶
Rate limit configuration for token bucket algorithm.
from proxywhirl.rate_limiting import RateLimit
# Basic rate limit
limit = RateLimit(
max_requests=100,
time_window=60 # 100 requests per 60 seconds
)
# With burst allowance
limit = RateLimit(
max_requests=100,
time_window=60,
burst_allowance=20 # Allow bursts up to 20 requests beyond sustained rate
)
Fields¶
max_requests(int): Maximum requests allowed in time windowtime_window(int): Time window in secondsburst_allowance(int | None): Burst capacity for token bucket (default: None)
Token Bucket Behavior:
Sustained rate:
max_requests / time_windowrequests per secondBurst capacity: Up to
burst_allowanceadditional requests can be consumed rapidlyRefill rate: Tokens refill at sustained rate
Example Scenarios:
# Scenario 1: Strict 10 req/sec (no bursts)
limit = RateLimit(max_requests=10, time_window=1)
# Allows exactly 10 requests per second
# Scenario 2: 100 req/min with 20-request burst
limit = RateLimit(max_requests=100, time_window=60, burst_allowance=20)
# Sustained: ~1.67 req/sec
# Can burst up to 20 requests immediately, then throttled to sustained rate
# Scenario 3: 1000 req/hour
limit = RateLimit(max_requests=1000, time_window=3600)
# Sustained: ~0.28 req/sec or ~16.67 req/min
RateLimitEvent¶
Rate limit event for logging and monitoring.
from proxywhirl.rate_limiting import RateLimitEvent
from datetime import datetime, timezone
event = RateLimitEvent(
timestamp=datetime.now(timezone.utc),
proxy_id="proxy1",
event_type="throttled", # or "exceeded", "adaptive_change"
details={
"requests_in_window": 10,
"limit": 10,
"time_window": 1
}
)
print(f"Event: {event.event_type} for {event.proxy_id}")
print(f"Details: {event.details}")
Fields¶
timestamp(datetime): When event occurredproxy_id(str): Proxy identifierevent_type(str): Event type"throttled": Request was throttled (rate limit applied)"exceeded": Rate limit exceeded"adaptive_change": Rate limit adjusted automatically
details(dict): Event-specific details
Usage:
Log to file/database for analysis
Send to monitoring system (Prometheus, DataDog)
Trigger alerts on excessive throttling
Usage Examples¶
Basic Rate Limiting¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
# Create async limiter
limiter = AsyncRateLimiter()
# Set per-proxy limits
await limiter.set_proxy_limit(
"proxy1",
RateLimit(max_requests=10, time_window=1) # 10 req/sec
)
# Check limit before each request
async def make_safe_request(proxy_id, url):
if await limiter.check_limit(proxy_id):
return await httpx_client.get(url, proxy=get_proxy_url(proxy_id))
else:
raise RateLimitExceeded(f"Rate limit exceeded for {proxy_id}")
# Use in loop
for i in range(20):
try:
response = await make_safe_request("proxy1", "https://api.example.com")
print(f"Request {i}: {response.status_code}")
except RateLimitExceeded:
print(f"Request {i}: Rate limited")
await asyncio.sleep(0.1) # Wait before retry
Global + Per-Proxy Limits¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
# Global limit: 1000 req/min across all proxies
# Per-proxy limit: 10 req/sec per proxy
limiter = AsyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
for proxy_id in ["proxy1", "proxy2", "proxy3"]:
await limiter.set_proxy_limit(
proxy_id,
RateLimit(max_requests=10, time_window=1)
)
# Request will be limited by whichever constraint is hit first
async def make_request(proxy_id):
if await limiter.check_limit(proxy_id):
# Both per-proxy and global limits allow this request
return await http_client.get("https://api.example.com", proxy=proxy_id)
else:
# Either per-proxy or global limit exceeded
return None
Burst Handling¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
# Allow bursts up to 50 requests, then throttle to 10 req/sec
limiter = AsyncRateLimiter()
await limiter.set_proxy_limit(
"proxy1",
RateLimit(
max_requests=10,
time_window=1,
burst_allowance=50 # Can burst 50 requests immediately
)
)
# Burst scenario
results = []
for i in range(100):
allowed = await limiter.check_limit("proxy1")
results.append(allowed)
# First ~50 will be True (burst), then throttled to 10/sec
print(f"Allowed: {sum(results)}/100")
# Wait for bucket to refill
await asyncio.sleep(5) # Wait 5 seconds
# Can burst again
allowed = await limiter.check_limit("proxy1")
assert allowed is True
Dynamic Limit Adjustment¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter()
# Start with conservative limit
await limiter.set_proxy_limit(
"proxy1",
RateLimit(max_requests=5, time_window=1)
)
# Monitor success rate and adjust
async def adaptive_request(proxy_id):
if await limiter.check_limit(proxy_id):
try:
response = await http_client.get("https://api.example.com", proxy=proxy_id)
# Success - increase limit
if response.status_code == 200:
await limiter.set_proxy_limit(
proxy_id,
RateLimit(max_requests=10, time_window=1)
)
return response
except Exception as e:
# Failure - decrease limit
await limiter.set_proxy_limit(
proxy_id,
RateLimit(max_requests=2, time_window=1)
)
raise
else:
return None
Integration with ProxyWhirl¶
from proxywhirl import ProxyWhirl
from proxywhirl.rate_limiting import SyncRateLimiter, RateLimit
# Create rotator and sync limiter
rotator = ProxyWhirl()
limiter = SyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
# Add proxies with individual limits
proxies = [
("proxy1", RateLimit(max_requests=20, time_window=1)),
("proxy2", RateLimit(max_requests=10, time_window=1)),
("proxy3", RateLimit(max_requests=5, time_window=1)),
]
for proxy_id, limit in proxies:
rotator.add_proxy(f"http://{proxy_id}.example.com:8080")
limiter.set_proxy_limit(proxy_id, limit)
# Make requests with rate limiting (sync)
def make_request_with_rate_limit(url):
# Get proxy from rotator
proxy = rotator.get_proxy()
proxy_id = str(proxy.id)
# Check rate limit (sync - no await needed)
if limiter.check_limit(proxy_id):
response = rotator.get(url)
return response
else:
# Rate limited, try different proxy
return make_request_with_rate_limit(url)
# Use in application
response = make_request_with_rate_limit("https://api.example.com")
Monitoring and Logging¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit, RateLimitEvent
from datetime import datetime, timezone
from loguru import logger
limiter = AsyncRateLimiter()
await limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
# Track rate limit events
events = []
async def make_request_with_monitoring(proxy_id, url):
allowed = await limiter.check_limit(proxy_id)
if allowed:
# Request allowed
event = RateLimitEvent(
timestamp=datetime.now(timezone.utc),
proxy_id=proxy_id,
event_type="allowed",
details={"url": url}
)
events.append(event)
logger.info(f"Request allowed for {proxy_id}")
return await http_client.get(url, proxy=proxy_id)
else:
# Rate limited
event = RateLimitEvent(
timestamp=datetime.now(timezone.utc),
proxy_id=proxy_id,
event_type="exceeded",
details={"url": url}
)
events.append(event)
logger.warning(f"Rate limit exceeded for {proxy_id}")
return None
# Analyze events
throttled_count = sum(1 for e in events if e.event_type == "exceeded")
print(f"Throttled requests: {throttled_count}/{len(events)}")
Retry with Backoff¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
import asyncio
limiter = AsyncRateLimiter()
await limiter.set_proxy_limit("proxy1", RateLimit(max_requests=10, time_window=1))
async def make_request_with_retry(proxy_id, url, max_retries=3):
for attempt in range(max_retries):
if await limiter.check_limit(proxy_id):
try:
return await http_client.get(url, proxy=proxy_id)
except Exception as e:
if attempt < max_retries - 1:
await asyncio.sleep(2 ** attempt) # Exponential backoff
else:
raise
else:
# Rate limited, wait before retry
wait_time = 1.0 / 10 # 1 / (requests per second)
await asyncio.sleep(wait_time)
raise Exception(f"Failed after {max_retries} retries")
# Use
response = await make_request_with_retry("proxy1", "https://api.example.com")
Thread Safety¶
SyncRateLimiter and RateLimiter are thread-safe using threading.RLock. AsyncRateLimiter uses asyncio.Lock for async safety.
import asyncio
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter()
async def main():
await limiter.set_proxy_limit("proxy1", RateLimit(max_requests=100, time_window=1))
async def worker(worker_id):
for i in range(20):
if await limiter.check_limit("proxy1"):
print(f"Worker {worker_id}: Request {i} allowed")
else:
print(f"Worker {worker_id}: Request {i} rate limited")
await asyncio.sleep(0.01)
# Run concurrent workers
tasks = [worker(i) for i in range(10)]
await asyncio.gather(*tasks)
asyncio.run(main())
Performance Considerations¶
Token Bucket Algorithm¶
Advantages:
Allows burst traffic up to bucket capacity
Smooth rate limiting over time
Low overhead (O(1) per request)
No memory growth with large request volumes
Tuning:
High-throughput APIs:
# Allow high sustained rate with moderate bursts RateLimit( max_requests=1000, time_window=1, burst_allowance=200 )
Rate-limited APIs:
# Strict limit with small burst RateLimit( max_requests=10, time_window=1, burst_allowance=2 )
Bursty workloads:
# Low sustained rate, high burst capacity RateLimit( max_requests=100, time_window=60, # ~1.67 req/sec sustained burst_allowance=50 # Can handle sudden spikes )
Memory Usage¶
Per-Proxy Limiters:
Each proxy with a limit creates a
LimiterinstanceMemory: ~1KB per limiter
1000 proxies ≈ 1MB memory overhead
Optimization:
# Option 1: Use global limit only (minimal memory)
limiter = SyncRateLimiter(
global_limit=RateLimit(max_requests=1000, time_window=60)
)
# Option 2: Set limits only for high-traffic proxies
limiter = SyncRateLimiter()
for proxy_id in high_traffic_proxies:
limiter.set_proxy_limit(proxy_id, RateLimit(max_requests=20, time_window=1))
Concurrency¶
Async/Await (AsyncRateLimiter):
check_limit()andacquire()are async-safe viaasyncio.LockLock contention is minimal (only during limiter access)
No blocking I/O
Sync (SyncRateLimiter/RateLimiter):
Thread-safe via
threading.RLockAll methods are synchronous (no
awaitneeded)
Best Practices:
# Good: Check limit before expensive operation (async)
if await async_limiter.check_limit(proxy_id):
response = await expensive_api_call(proxy_id)
# Good: Check limit before expensive operation (sync)
if sync_limiter.check_limit(proxy_id):
response = make_request(proxy_id)
Integration with Monitoring¶
Prometheus Metrics¶
from prometheus_client import Counter, Gauge
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
# Define metrics
rate_limit_allowed = Counter(
'rate_limit_allowed_total',
'Total requests allowed',
['proxy_id']
)
rate_limit_exceeded = Counter(
'rate_limit_exceeded_total',
'Total requests rate limited',
['proxy_id']
)
limiter = AsyncRateLimiter()
async def make_request_with_metrics(proxy_id, url):
if await limiter.check_limit(proxy_id):
rate_limit_allowed.labels(proxy_id=proxy_id).inc()
return await http_client.get(url, proxy=proxy_id)
else:
rate_limit_exceeded.labels(proxy_id=proxy_id).inc()
return None
DataDog Integration¶
from datadog import statsd
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter()
async def make_request_with_datadog(proxy_id, url):
if await limiter.check_limit(proxy_id):
statsd.increment('rate_limit.allowed', tags=[f'proxy:{proxy_id}'])
return await http_client.get(url, proxy=proxy_id)
else:
statsd.increment('rate_limit.exceeded', tags=[f'proxy:{proxy_id}'])
return None
Sphinx Integration¶
Add to your conf.py:
extensions = [
'sphinx.ext.autodoc',
'sphinx.ext.napoleon',
'sphinx_autodoc_typehints',
]
autodoc_default_options = {
'members': True,
'undoc-members': True,
'show-inheritance': True,
}
In your RST files:
Rate Limiting API
=================
.. automodule:: proxywhirl.rate_limiting
:members:
:undoc-members:
:show-inheritance:
RateLimiter
-----------
.. autoclass:: proxywhirl.rate_limiting.RateLimiter
:members:
:undoc-members:
:show-inheritance:
Models
------
.. autoclass:: proxywhirl.rate_limiting.RateLimit
:members:
:undoc-members:
.. autoclass:: proxywhirl.rate_limiting.RateLimitEvent
:members:
:undoc-members:
Common Patterns¶
Fallback to Different Proxy¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter(global_limit=RateLimit(max_requests=100, time_window=60))
async def make_request_with_fallback(url, proxy_ids):
for proxy_id in proxy_ids:
if await limiter.check_limit(proxy_id):
try:
return await http_client.get(url, proxy=proxy_id)
except Exception:
continue
raise Exception("All proxies rate limited or failed")
# Use
response = await make_request_with_fallback(
"https://api.example.com",
["proxy1", "proxy2", "proxy3"]
)
Batch Requests with Rate Limiting¶
import asyncio
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
limiter = AsyncRateLimiter(global_limit=RateLimit(max_requests=100, time_window=60))
async def batch_request_with_rate_limit(urls, proxy_id, batch_size=10):
results = []
for i in range(0, len(urls), batch_size):
batch = urls[i:i + batch_size]
# Check if we can make batch_size requests
allowed_count = 0
for _ in range(len(batch)):
if await limiter.check_limit(proxy_id):
allowed_count += 1
else:
break
# Make allowed requests
batch_results = await asyncio.gather(*[
http_client.get(url, proxy=proxy_id)
for url in batch[:allowed_count]
])
results.extend(batch_results)
# Wait if rate limited
if allowed_count < len(batch):
await asyncio.sleep(1.0)
return results
Circuit Breaker Integration¶
from proxywhirl.rate_limiting import AsyncRateLimiter, RateLimit
from proxywhirl import CircuitBreaker
limiter = AsyncRateLimiter(global_limit=RateLimit(max_requests=100, time_window=60))
circuit_breakers = {}
async def make_request_with_circuit_breaker(proxy_id, url):
# Initialize circuit breaker if needed
if proxy_id not in circuit_breakers:
circuit_breakers[proxy_id] = CircuitBreaker(
proxy_id=proxy_id,
failure_threshold=5,
window_duration=60.0,
timeout_duration=30.0
)
breaker = circuit_breakers[proxy_id]
# Check circuit breaker
if not breaker.should_attempt_request():
raise Exception(f"Circuit breaker open for {proxy_id}")
# Check rate limit
if not await limiter.check_limit(proxy_id):
raise Exception(f"Rate limit exceeded for {proxy_id}")
# Make request
try:
response = await http_client.get(url, proxy=proxy_id)
breaker.record_success()
return response
except Exception as e:
breaker.record_failure()
raise
FAQ¶
Q: What’s the difference between check_limit() and acquire()?
A: They are identical. acquire() is an alias for check_limit() for API consistency with other rate limiting libraries.
Q: Can I remove a per-proxy limit?
A: Currently, no. You can set it to a very high value effectively disabling it:
limiter.set_proxy_limit("proxy1", RateLimit(max_requests=1000000, time_window=1))
Q: Does the limiter persist state across restarts?
A: No, all state is in-memory. Rate limits reset on process restart.
Q: Can I use this with synchronous code?
A: Yes, use SyncRateLimiter which provides a fully synchronous, thread-safe interface:
from proxywhirl.rate_limiting import SyncRateLimiter, RateLimit
limiter = SyncRateLimiter(global_limit=RateLimit(max_requests=100, time_window=60))
if limiter.check_limit("proxy1"):
# Make synchronous request
pass
Q: How do I calculate the right rate limit for an API?
A: Check the API’s documentation for rate limits. Common formats:
“100 requests per minute” →
RateLimit(max_requests=100, time_window=60)“10 requests per second” →
RateLimit(max_requests=10, time_window=1)“1000 requests per hour” →
RateLimit(max_requests=1000, time_window=3600)
Q: Can I set different limits for different endpoints?
A: Not directly. Create separate RateLimiter instances or use proxy_id to encode endpoint information:
limiter.set_proxy_limit("proxy1_api_v1", RateLimit(max_requests=10, time_window=1))
limiter.set_proxy_limit("proxy1_api_v2", RateLimit(max_requests=20, time_window=1))
See Also¶
Python API – Main ProxyWhirl and AsyncProxyWhirl API
REST API – REST API rate limiting configuration
Configuration – TOML configuration for rate limits
Retry & Failover – Retry and circuit breaker integration
Async Client – Async client patterns with rate limiting
Deployment Security – Production rate limiting setup