"""
Main proxy rotation implementation.
"""
from __future__ import annotations
import queue
import threading
import time
from typing import TYPE_CHECKING, Any
from urllib.parse import quote
import httpx
from loguru import logger
from proxywhirl.circuit_breaker import CircuitBreaker
from proxywhirl.exceptions import (
ProxyAuthenticationError,
ProxyConnectionError,
ProxyPoolEmptyError,
RequestQueueFullError,
)
from proxywhirl.models import BootstrapConfig, Proxy, ProxyChain, ProxyConfiguration, ProxyPool
from proxywhirl.retry import NonRetryableError, RetryExecutor, RetryMetrics, RetryPolicy
from proxywhirl.rotator._bootstrap import bootstrap_pool_if_empty_sync
from proxywhirl.rotator.base import ProxyRotatorBase
from proxywhirl.rotator.client_pool import (
LRUClientPool, # noqa: F401 - re-export for backward compatibility
)
from proxywhirl.strategies import (
LeastUsedStrategy,
RandomStrategy,
RotationStrategy,
RoundRobinStrategy,
WeightedStrategy,
)
from proxywhirl.utils import mask_proxy_url
if TYPE_CHECKING:
from proxywhirl.rate_limiting import SyncRateLimiter
def _resolve_strategy(strategy: RotationStrategy | str | None) -> RotationStrategy:
"""Resolve a strategy instance from either a strategy object or known string name."""
if strategy is None:
return RoundRobinStrategy()
if not isinstance(strategy, str):
return strategy
from proxywhirl.strategies import (
GeoTargetedStrategy,
PerformanceBasedStrategy,
SessionPersistenceStrategy,
)
strategy_map: dict[str, type[RotationStrategy]] = {
"round-robin": RoundRobinStrategy,
"round_robin": RoundRobinStrategy,
"random": RandomStrategy,
"weighted": WeightedStrategy,
"least-used": LeastUsedStrategy,
"least_used": LeastUsedStrategy,
"performance-based": PerformanceBasedStrategy,
"performance_based": PerformanceBasedStrategy,
"session": SessionPersistenceStrategy,
"session-persistence": SessionPersistenceStrategy,
"session_persistence": SessionPersistenceStrategy,
"geo-targeted": GeoTargetedStrategy,
"geo_targeted": GeoTargetedStrategy,
}
strategy_lower = strategy.lower()
if strategy_lower not in strategy_map:
raise ValueError(f"Unknown strategy: {strategy}. Valid options: {', '.join(strategy_map)}")
return strategy_map[strategy_lower]()
[docs]
class ProxyWhirl(ProxyRotatorBase):
"""
Main class for proxy rotation with automatic failover.
Provides HTTP methods (GET, POST, PUT, DELETE, PATCH) that automatically
rotate through a pool of proxies, with intelligent failover on connection errors.
Example::
from proxywhirl import ProxyWhirl, Proxy
rotator = ProxyWhirl()
rotator.add_proxy("http://proxy1.example.com:8080")
rotator.add_proxy("http://proxy2.example.com:8080")
response = rotator.get("https://httpbin.org/ip")
print(response.json())
"""
def __init__(
self,
proxies: list[Proxy] | None = None,
strategy: RotationStrategy | str | None = None,
config: ProxyConfiguration | None = None,
retry_policy: RetryPolicy | None = None,
rate_limiter: SyncRateLimiter | None = None,
bootstrap: BootstrapConfig | bool | None = None,
) -> None:
"""
Initialize ProxyWhirl.
Args:
proxies: Initial list of proxies (optional)
strategy: Rotation strategy instance or name string. Supported
names: ``round-robin``, ``random``, ``weighted``,
``least-used``, ``performance-based``, ``session``,
``geo-targeted``. Defaults to RoundRobinStrategy.
config: Configuration settings (default: ProxyConfiguration())
retry_policy: Retry policy configuration (default: RetryPolicy())
rate_limiter: Synchronous rate limiter for controlling request rates (optional)
bootstrap: Bootstrap configuration for lazy proxy fetching.
False disables auto-bootstrap (for manual proxy management).
True or None uses default BootstrapConfig.
"""
self.pool = ProxyPool(name="default", proxies=proxies or [])
self.chains: list[ProxyChain] = [] # Track registered proxy chains
self.strategy = _resolve_strategy(strategy)
self.config = config or ProxyConfiguration()
self._client: httpx.Client | None = None
self._client_pool = LRUClientPool(maxsize=100) # LRU cache with max 100 clients
# Retry and circuit breaker components
self.retry_policy = retry_policy or RetryPolicy()
self.circuit_breakers: dict[str, CircuitBreaker] = {}
self.retry_metrics = RetryMetrics()
self.retry_executor = RetryExecutor(
self.retry_policy, self.circuit_breakers, self.retry_metrics
)
# Rate limiting
self.rate_limiter = rate_limiter
# Request queuing (optional, disabled by default)
self._request_queue: queue.Queue[Any] | None = None
if self.config.queue_enabled:
self._request_queue = queue.Queue(maxsize=self.config.queue_size)
logger.info("Request queuing enabled", queue_size=self.config.queue_size)
# Initialize circuit breakers for existing proxies (all start CLOSED per FR-021)
# Use get_all_proxies() for consistency, even though this is during init
for proxy in self.pool.get_all_proxies():
self.circuit_breakers[str(proxy.id)] = CircuitBreaker(proxy_id=str(proxy.id))
# Bootstrap configuration (coerce bool/None to BootstrapConfig)
if bootstrap is False:
self._bootstrap_config = BootstrapConfig(enabled=False)
elif isinstance(bootstrap, BootstrapConfig):
self._bootstrap_config = bootstrap
else:
self._bootstrap_config = BootstrapConfig()
# Initialize strategy lock for atomic strategy swapping
self._strategy_lock = threading.RLock()
self._bootstrap_lock = threading.Lock()
self._bootstrap_attempted = False
self._bootstrap_error_message: str | None = None
# Start periodic metrics aggregation timer (every 5 minutes)
self._aggregation_timer: threading.Timer | None = None
self._start_aggregation_timer()
# Configure logging
if hasattr(logger, "_core") and not logger._core.handlers:
from proxywhirl.utils import configure_logging
configure_logging(
level=self.config.log_level,
format_type=self.config.log_format,
redact_credentials=self.config.log_redact_credentials,
)
def __enter__(self) -> ProxyWhirl:
"""Enter context manager."""
self._client = httpx.Client(
timeout=self.config.timeout,
verify=self.config.verify_ssl,
follow_redirects=self.config.follow_redirects,
limits=httpx.Limits(
max_connections=self.config.pool_connections,
max_keepalive_connections=self.config.pool_max_keepalive,
),
)
return self
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Exit context manager."""
if self._client:
self._client.close()
self._client = None
# Close all pooled clients
self._close_all_clients()
# Cancel aggregation timer
if self._aggregation_timer:
self._aggregation_timer.cancel()
self._aggregation_timer = None
def __del__(self) -> None:
"""Destructor to ensure clients are closed."""
# Only cleanup if initialization completed
if hasattr(self, "_client_pool"):
self._close_all_clients()
# Cancel aggregation timer
if hasattr(self, "_aggregation_timer") and self._aggregation_timer:
self._aggregation_timer.cancel()
def _close_all_clients(self) -> None:
"""Close all pooled clients and clear the pool."""
if hasattr(self, "_client_pool"):
self._client_pool.clear()
[docs]
def add_proxy(self, proxy: Proxy | str) -> None:
"""
Add a proxy to the pool.
Args:
proxy: Proxy instance or URL string
"""
if isinstance(proxy, str):
from proxywhirl.utils import create_proxy_from_url
proxy = create_proxy_from_url(proxy)
self.pool.add_proxy(proxy)
# Initialize circuit breaker for new proxy (starts CLOSED per FR-021)
self.circuit_breakers[str(proxy.id)] = CircuitBreaker(proxy_id=str(proxy.id))
# Mask credentials in log output
masked_url = mask_proxy_url(proxy.url)
logger.info(f"Added proxy to pool: {masked_url}", proxy_id=str(proxy.id))
[docs]
def remove_proxy(self, proxy_id: str) -> None:
"""
Remove a proxy from the pool.
Args:
proxy_id: UUID of proxy to remove
"""
from uuid import UUID
# Close and remove the client for this proxy if it exists
self._client_pool.remove(proxy_id)
# Clean up circuit breaker to prevent memory leak
if proxy_id in self.circuit_breakers:
del self.circuit_breakers[proxy_id]
logger.debug(f"Removed circuit breaker for proxy: {proxy_id}")
self.pool.remove_proxy(UUID(proxy_id))
logger.info(f"Removed proxy from pool: {proxy_id}")
def _bootstrap_pool_if_empty(self) -> int:
"""Bootstrap pool from built-in sources when empty."""
return bootstrap_pool_if_empty_sync(
pool=self.pool,
add_proxy=self.add_proxy,
config=self._bootstrap_config,
)
def _ensure_bootstrap_for_empty_pool(self) -> None:
"""Trigger one-time lazy bootstrap before request-time proxy selection."""
if self.pool.size > 0:
return
with self._bootstrap_lock:
if self.pool.size > 0:
return
if self._bootstrap_error_message is not None:
raise ProxyPoolEmptyError(self._bootstrap_error_message)
if self._bootstrap_attempted:
return
self._bootstrap_attempted = True
try:
self._bootstrap_pool_if_empty()
except ProxyPoolEmptyError as exc:
self._bootstrap_error_message = str(exc)
raise
# If pool is still empty after bootstrap (e.g., bootstrap disabled)
if self.pool.size == 0:
if not self._bootstrap_config.enabled:
msg = (
"Proxy pool is empty and auto-bootstrap is disabled. "
"Call .add_proxy() to add proxies manually, or set "
"bootstrap=BootstrapConfig(enabled=True)."
)
else:
msg = "Bootstrap completed but proxy pool is still empty."
self._bootstrap_error_message = msg
raise ProxyPoolEmptyError(msg)
[docs]
def add_chain(self, chain: ProxyChain) -> None:
"""
Add a proxy chain to the rotator.
This method registers a proxy chain for potential use in routing.
The entry proxy (first proxy in the chain) is added to the pool
for selection by rotation strategies.
Note: Full CONNECT tunneling implementation is not yet supported.
Currently, only the entry proxy is used for routing, with chain
metadata stored for future multi-hop implementation.
Args:
chain: ProxyChain instance to register
Example:
>>> rotator = ProxyWhirl()
>>> chain = ProxyChain(
... proxies=[
... Proxy(url="http://proxy1.com:8080"),
... Proxy(url="http://proxy2.com:8080"),
... ],
... name="my_chain"
... )
>>> rotator.add_chain(chain)
"""
# Store the chain for future reference
self.chains.append(chain)
# Add entry proxy to the pool for selection
entry_proxy = chain.entry_proxy
# Tag the entry proxy to indicate it's part of a chain
if not hasattr(entry_proxy, "tags"):
entry_proxy.tags = set()
entry_proxy.tags.add("chain_entry")
# Store chain metadata in proxy metadata
entry_proxy.metadata["chain_name"] = chain.name
entry_proxy.metadata["chain_length"] = chain.chain_length
entry_proxy.metadata["chain_urls"] = chain.get_chain_urls()
# Add to pool
self.pool.add_proxy(entry_proxy)
# Initialize circuit breaker for the entry proxy
self.circuit_breakers[str(entry_proxy.id)] = CircuitBreaker(proxy_id=str(entry_proxy.id))
logger.info(
"Added proxy chain to rotator",
chain_name=chain.name or "unnamed",
chain_length=chain.chain_length,
entry_proxy=str(entry_proxy.url),
)
[docs]
def get_chains(self) -> list[ProxyChain]:
"""
Get all registered proxy chains.
Returns:
List of ProxyChain instances
"""
return self.chains.copy()
[docs]
def remove_chain(self, chain_name: str) -> bool:
"""
Remove a proxy chain by name.
Args:
chain_name: Name of the chain to remove
Returns:
True if chain was found and removed, False otherwise
"""
for i, chain in enumerate(self.chains):
if chain.name == chain_name:
# Remove the entry proxy from the pool
entry_proxy_id = str(chain.entry_proxy.id)
try:
self.remove_proxy(entry_proxy_id)
except Exception as e:
logger.warning(f"Could not remove entry proxy: {e}")
# Remove the chain
self.chains.pop(i)
logger.info(f"Removed proxy chain: {chain_name}")
return True
logger.warning(f"Chain not found: {chain_name}")
return False
[docs]
def set_strategy(self, strategy: RotationStrategy | str, *, atomic: bool = True) -> None:
"""
Hot-swap the rotation strategy without restarting.
This method implements atomic strategy swapping to ensure:
- New requests immediately use the new strategy
- In-flight requests complete with their original strategy
- No requests are dropped during the swap
- Swap completes in <100ms (SC-009)
Args:
strategy: New strategy instance or name string. Supported
names: ``round-robin``, ``random``, ``weighted``,
``least-used``, ``performance-based``, ``session``,
``geo-targeted``.
atomic: If True (default), ensures atomic swap. If False, allows
immediate replacement (faster but may affect in-flight requests)
Example:
>>> rotator = ProxyWhirl(strategy="round-robin")
>>> # ... after some requests ...
>>> rotator.set_strategy("performance-based") # Hot-swap
>>> # New requests now use performance-based strategy
Thread Safety:
Thread-safe via atomic reference swap. Multiple threads can
call this method safely without race conditions.
Performance:
Target: <100ms for hot-swap completion (SC-009)
Typical: <10ms for strategy instance creation and assignment
"""
import time
start_time = time.perf_counter()
# Parse strategy string or use provided instance
new_strategy = _resolve_strategy(strategy) if isinstance(strategy, str) else strategy
# Store old strategy for logging
old_strategy_name = self.strategy.__class__.__name__
new_strategy_name = new_strategy.__class__.__name__
if atomic:
# Use a lock to ensure atomic swap
with self._strategy_lock:
self.strategy = new_strategy
else:
# Direct assignment (faster but less safe)
self.strategy = new_strategy
elapsed_ms = (time.perf_counter() - start_time) * 1000
logger.info(
f"Strategy hot-swapped: {old_strategy_name} ? {new_strategy_name} "
f"(completed in {elapsed_ms:.2f}ms)",
old_strategy=old_strategy_name,
new_strategy=new_strategy_name,
swap_time_ms=elapsed_ms,
)
# Validate SC-009: <100ms hot-swap time
if elapsed_ms >= 100.0:
logger.warning(
f"Hot-swap exceeded target time: {elapsed_ms:.2f}ms (target: <100ms)",
swap_time_ms=elapsed_ms,
)
[docs]
def get_pool_stats(self) -> dict[str, Any]:
"""
Get statistics about the proxy pool.
Returns:
dict[str, Any]: Pool statistics including total_proxies, healthy_proxies,
unhealthy_proxies, dead_proxies, total_requests, total_successes,
total_failures, and average_success_rate.
"""
from proxywhirl.models import HealthStatus
# Take a snapshot to avoid race conditions during iteration
proxies_snapshot = self.pool.get_all_proxies()
healthy_count = sum(
1
for p in proxies_snapshot
if p.health_status
in (HealthStatus.HEALTHY, HealthStatus.UNKNOWN, HealthStatus.DEGRADED)
)
unhealthy_count = sum(
1 for p in proxies_snapshot if p.health_status == HealthStatus.UNHEALTHY
)
dead_count = sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.DEAD)
total_requests = sum(p.total_requests for p in proxies_snapshot)
total_successes = sum(p.total_successes for p in proxies_snapshot)
total_failures = sum(p.total_failures for p in proxies_snapshot)
# Calculate average success rate
success_rates = [p.success_rate for p in proxies_snapshot if p.total_requests > 0]
avg_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0.0
return {
"total_proxies": self.pool.size,
"healthy_proxies": healthy_count,
"unhealthy_proxies": unhealthy_count,
"dead_proxies": dead_count,
"total_requests": total_requests,
"total_successes": total_successes,
"total_failures": total_failures,
"average_success_rate": avg_success_rate,
}
[docs]
def get_statistics(self) -> dict[str, Any]:
"""
Get comprehensive statistics including source breakdown (FR-050).
Returns:
dict[str, Any]: All stats from get_pool_stats() plus source_breakdown
mapping source names to proxy counts.
"""
stats = self.get_pool_stats()
stats["source_breakdown"] = self.pool.get_source_breakdown()
return stats
[docs]
def clear_unhealthy_proxies(self) -> int:
"""
Remove all unhealthy and dead proxies from the pool.
Returns:
Number of proxies removed
"""
from proxywhirl.models import HealthStatus
# Take a snapshot to avoid race conditions during iteration
proxies_snapshot = self.pool.get_all_proxies()
# Capture IDs of proxies to be removed before clearing
removed_proxy_ids = [
str(p.id)
for p in proxies_snapshot
if p.health_status in (HealthStatus.DEAD, HealthStatus.UNHEALTHY)
]
removed_count = self.pool.clear_unhealthy()
# Clean up circuit breakers for removed proxies to prevent memory leak
for proxy_id in removed_proxy_ids:
if proxy_id in self.circuit_breakers:
del self.circuit_breakers[proxy_id]
logger.debug(f"Removed circuit breaker for unhealthy proxy: {proxy_id}")
logger.info(
"Cleared unhealthy proxies from pool",
removed_count=removed_count,
remaining_proxies=self.pool.size,
)
return removed_count
def _get_or_create_client(self, proxy: Proxy, proxy_dict: dict[str, str]) -> httpx.Client:
"""
Get or create a pooled httpx.Client for the given proxy.
This method implements connection pooling by maintaining a cache of clients
per proxy. Clients are configured with connection pool settings from the
configuration and are reused across multiple requests to the same proxy.
The pool has a maximum size limit (default: 100). When the limit is reached,
the least recently used client is evicted to prevent unbounded memory growth.
Args:
proxy: Proxy instance to get/create client for
proxy_dict: Proxy dictionary for httpx configuration
Returns:
Configured httpx.Client instance with connection pooling
"""
proxy_id = str(proxy.id)
# Try to get existing client from LRU pool
existing_client = self._client_pool.get(proxy_id)
if existing_client is not None:
return existing_client
# Create new client with connection pooling settings
client = httpx.Client(
proxy=proxy_dict.get("http://"),
timeout=self.config.timeout,
verify=self.config.verify_ssl,
follow_redirects=self.config.follow_redirects,
limits=httpx.Limits(
max_connections=self.config.pool_connections,
max_keepalive_connections=self.config.pool_max_keepalive,
),
)
# Store in LRU pool (automatically evicts LRU if at capacity)
self._client_pool.put(proxy_id, client)
logger.debug(
"Created new pooled client for proxy",
proxy_id=proxy_id,
pool_connections=self.config.pool_connections,
pool_max_keepalive=self.config.pool_max_keepalive,
client_pool_size=len(self._client_pool),
)
return client
def _get_proxy_dict(self, proxy: Proxy) -> dict[str, str]:
"""
Convert proxy to httpx proxy dict format.
Args:
proxy: Proxy to convert
Returns:
dict[str, str]: Proxy URL for httpx with "http://" and "https://" keys.
"""
url = str(proxy.url)
# Add credentials to URL if present
if proxy.username and proxy.password:
username = proxy.username.get_secret_value()
password = proxy.password.get_secret_value()
# URL-encode credentials to handle special characters like @, :, /, etc.
# Using safe='' ensures all special characters are encoded
username_encoded = quote(username, safe="")
password_encoded = quote(password, safe="")
# Insert URL-encoded credentials into URL
if "://" in url:
protocol, rest = url.split("://", 1)
url = f"{protocol}://{username_encoded}:{password_encoded}@{rest}"
# Return proxy dict for all protocols
return {
"http://": url,
"https://": url,
}
def _make_request(
self,
method: str,
url: str,
retry_policy: RetryPolicy | None = None,
**kwargs: Any,
) -> httpx.Response:
"""
Make HTTP request with automatic proxy rotation, retry, and circuit breakers.
Args:
method: HTTP method
url: URL to request
retry_policy: Optional per-request retry policy override
**kwargs: Additional request parameters
Returns:
HTTP response
Raises:
ProxyPoolEmptyError: If no healthy proxies available
ProxyConnectionError: If all retry attempts fail
RequestQueueFullError: If queue is full and cannot accept request
"""
self._ensure_bootstrap_for_empty_pool()
# Select proxy with circuit breaker filtering
try:
proxy = self._select_proxy_with_circuit_breaker()
except ProxyPoolEmptyError:
logger.error("No healthy proxies available or all circuit breakers open")
raise
# Check rate limit before making request
if self.rate_limiter is not None:
proxy_id = str(proxy.id)
# Check rate limit (synchronous call)
allowed = self.rate_limiter.check_limit(proxy_id)
if not allowed:
# Mask proxy URL in log output
masked_url = mask_proxy_url(str(proxy.url))
logger.warning(
f"Rate limit exceeded for proxy {proxy_id}",
proxy_id=proxy_id,
proxy_url=masked_url,
)
# If queuing is enabled, try to queue the request
if self.config.queue_enabled and self._request_queue is not None:
return self._queue_request(method, url, proxy, retry_policy, **kwargs)
# Otherwise, raise error
raise ProxyConnectionError(
f"Rate limit exceeded for proxy {proxy_id}. "
"Please wait before making more requests."
)
proxy_dict = self._get_proxy_dict(proxy)
# Mask proxy URL in log output
masked_url = mask_proxy_url(str(proxy.url))
logger.info(
f"Making {method} request to {url}",
proxy_id=str(proxy.id),
proxy_url=masked_url,
)
# Get or create pooled client for this proxy
client = self._get_or_create_client(proxy, proxy_dict)
# Define request function for retry executor
def request_fn() -> httpx.Response:
# Use pooled client (no context manager - client is reused)
response = client.request(method, url, **kwargs)
# Check for authentication errors (401 Unauthorized, 407 Proxy Auth Required)
if response.status_code in (401, 407):
logger.error(
f"Proxy authentication failed: {masked_url}",
proxy_id=str(proxy.id),
status_code=response.status_code,
)
raise ProxyAuthenticationError(
f"Proxy authentication failed ({response.status_code}) for {masked_url}. "
"Please provide valid credentials (username and password)."
)
return response
# Create a temporary retry executor with effective policy if different from global
if retry_policy is not None:
executor = RetryExecutor(retry_policy, self.circuit_breakers, self.retry_metrics)
else:
executor = self.retry_executor
# Execute with retry
try:
request_start_time = time.time()
response = executor.execute_with_retry(request_fn, proxy, method, url)
response_time_ms = (time.time() - request_start_time) * 1000
# Record success in strategy
self.strategy.record_result(proxy, success=True, response_time_ms=response_time_ms)
logger.info(
f"Request successful: {method} {url}",
proxy_id=str(proxy.id),
status_code=response.status_code,
response_time_ms=response_time_ms,
)
return response
except ProxyAuthenticationError as e:
# Record auth errors as failures
self.strategy.record_result(proxy, success=False, response_time_ms=0.0)
logger.error(
f"Authentication error for proxy {proxy.id}",
proxy_id=str(proxy.id),
error=str(e),
)
# Re-raise auth errors without wrapping
raise
except NonRetryableError as e:
# Check if this wraps a ProxyAuthenticationError
if isinstance(e.__cause__, ProxyAuthenticationError):
# Record auth errors as failures
self.strategy.record_result(proxy, success=False, response_time_ms=0.0)
logger.error(
f"Authentication error for proxy {proxy.id}",
proxy_id=str(proxy.id),
error=str(e.__cause__),
)
# Re-raise the original auth error
raise e.__cause__
# For other non-retryable errors, record failure and convert to ProxyConnectionError
self.strategy.record_result(proxy, success=False, response_time_ms=0.0)
logger.warning(
f"Request failed after retries: {method} {url}",
proxy_id=str(proxy.id),
error=str(e),
)
raise ProxyConnectionError(f"Request failed: {e}") from e
except Exception as e:
# Record failure in strategy
self.strategy.record_result(proxy, success=False, response_time_ms=0.0)
logger.warning(
f"Request failed after retries: {method} {url}",
proxy_id=str(proxy.id),
error=str(e),
)
raise ProxyConnectionError(f"Request failed: {e}") from e
def _queue_request(
self,
method: str,
url: str,
proxy: Proxy,
retry_policy: RetryPolicy | None = None,
**kwargs: Any,
) -> httpx.Response:
"""
Queue a request when rate limited.
Args:
method: HTTP method
url: URL to request
proxy: Proxy to use
retry_policy: Optional retry policy
**kwargs: Additional request parameters
Returns:
HTTP response after queuing
Raises:
RequestQueueFullError: If queue is full
"""
if self._request_queue is None:
raise RuntimeError("Request queue not initialized")
# Check if queue has space (non-blocking check for backpressure)
if self._request_queue.full():
logger.error(
"Request queue is full - backpressure triggered",
queue_size=self.config.queue_size,
current_size=self._request_queue.qsize(),
)
raise RequestQueueFullError(
"Request queue is full. Cannot accept more requests.",
queue_size=self.config.queue_size,
)
# Create request task
request_data = {
"method": method,
"url": url,
"proxy": proxy,
"retry_policy": retry_policy,
"kwargs": kwargs,
}
# Queue the request (blocking put)
try:
# Use put_nowait to avoid blocking (raises QueueFull if full)
self._request_queue.put_nowait(request_data)
logger.info(
"Request queued",
method=method,
url=url,
queue_size=self._request_queue.qsize(),
proxy_id=str(proxy.id),
)
# Process the queue and execute the request
return self._process_queue_sync()
except queue.Full as e:
logger.error("Queue is full - cannot add request")
raise RequestQueueFullError(
"Queue is full - cannot add request", queue_size=self.config.queue_size
) from e
def _process_queue_sync(self) -> httpx.Response:
"""
Process queued requests synchronously.
Returns:
HTTP response from processed request
Raises:
ProxyConnectionError: If request processing fails
"""
if self._request_queue is None or self._request_queue.empty():
raise RuntimeError("No requests in queue to process")
# Get next request from queue (non-blocking)
try:
request_data = self._request_queue.get_nowait()
except queue.Empty as e:
raise ProxyConnectionError("No requests in queue") from e
# Extract request parameters
method = request_data["method"]
url = request_data["url"]
proxy = request_data["proxy"]
retry_policy = request_data["retry_policy"]
kwargs = request_data["kwargs"]
logger.info(
"Processing queued request",
method=method,
url=url,
remaining_queue_size=self._request_queue.qsize(),
)
# Execute the request using the existing logic
proxy_dict = self._get_proxy_dict(proxy)
client = self._get_or_create_client(proxy, proxy_dict)
masked_url = mask_proxy_url(str(proxy.url))
# Define request function for retry executor
def request_fn() -> httpx.Response:
response = client.request(method, url, **kwargs)
# Check for authentication errors
if response.status_code in (401, 407):
logger.error(
f"Proxy authentication failed: {masked_url}",
proxy_id=str(proxy.id),
status_code=response.status_code,
)
raise ProxyAuthenticationError(
f"Proxy authentication failed ({response.status_code}) for {masked_url}. "
"Please provide valid credentials (username and password)."
)
return response
# Create executor with appropriate policy
if retry_policy is not None:
executor = RetryExecutor(retry_policy, self.circuit_breakers, self.retry_metrics)
else:
executor = self.retry_executor
# Execute with retry
try:
request_start_time = time.time()
response = executor.execute_with_retry(request_fn, proxy, method, url)
response_time_ms = (time.time() - request_start_time) * 1000
# Record success in strategy
self.strategy.record_result(proxy, success=True, response_time_ms=response_time_ms)
logger.info(
f"Queued request successful: {method} {url}",
proxy_id=str(proxy.id),
status_code=response.status_code,
response_time_ms=response_time_ms,
)
return response
except Exception as e:
# Record failure in strategy
self.strategy.record_result(proxy, success=False, response_time_ms=0.0)
logger.warning(
f"Queued request failed: {method} {url}",
proxy_id=str(proxy.id),
error=str(e),
)
raise ProxyConnectionError(f"Queued request failed: {e}") from e
[docs]
def get(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make GET request."""
return self._make_request("GET", url, **kwargs)
[docs]
def post(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make POST request."""
return self._make_request("POST", url, **kwargs)
[docs]
def put(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make PUT request."""
return self._make_request("PUT", url, **kwargs)
[docs]
def delete(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make DELETE request."""
return self._make_request("DELETE", url, **kwargs)
[docs]
def patch(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make PATCH request."""
return self._make_request("PATCH", url, **kwargs)
[docs]
def head(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make HEAD request."""
return self._make_request("HEAD", url, **kwargs)
[docs]
def options(self, url: str, **kwargs: Any) -> httpx.Response:
"""Make OPTIONS request."""
return self._make_request("OPTIONS", url, **kwargs)
def _start_aggregation_timer(self) -> None:
"""Start periodic metrics aggregation timer (every 5 minutes)."""
def aggregate() -> None:
self.retry_metrics.aggregate_hourly()
self._start_aggregation_timer() # Schedule next aggregation
self._aggregation_timer = threading.Timer(300.0, aggregate) # 5 minutes
self._aggregation_timer.daemon = True
self._aggregation_timer.start()
[docs]
def get_circuit_breaker_states(self) -> dict[str, CircuitBreaker]:
"""
Get circuit breaker states for all proxies.
Returns:
dict[str, CircuitBreaker]: Mapping of proxy IDs to their circuit breaker instances.
"""
return self.circuit_breakers.copy()
[docs]
def reset_circuit_breaker(self, proxy_id: str) -> None:
"""
Manually reset a circuit breaker to CLOSED state.
Args:
proxy_id: ID of the proxy whose circuit breaker to reset
Raises:
KeyError: If proxy_id not found
"""
if proxy_id not in self.circuit_breakers:
raise KeyError(f"No circuit breaker found for proxy {proxy_id}")
self.circuit_breakers[proxy_id].reset()
logger.info(f"Circuit breaker manually reset for proxy {proxy_id}")
[docs]
def get_retry_metrics(self) -> RetryMetrics:
"""
Get retry metrics.
Returns:
RetryMetrics instance with current metrics
"""
return self.retry_metrics
[docs]
def get_queue_stats(self) -> dict[str, Any]:
"""
Get statistics about the request queue.
Returns:
dict[str, Any]: Queue statistics including enabled, size, max_size,
is_full, and is_empty.
"""
if not self.config.queue_enabled or self._request_queue is None:
return {
"enabled": False,
"size": 0,
"max_size": 0,
"is_full": False,
"is_empty": True,
}
return {
"enabled": True,
"size": self._request_queue.qsize(),
"max_size": self.config.queue_size,
"is_full": self._request_queue.full(),
"is_empty": self._request_queue.empty(),
}
[docs]
def clear_queue(self) -> int:
"""
Clear all pending requests from the queue.
Returns:
Number of requests cleared
Raises:
RuntimeError: If queue is not enabled
"""
if not self.config.queue_enabled or self._request_queue is None:
raise RuntimeError("Request queue is not enabled")
count = 0
while not self._request_queue.empty():
try:
self._request_queue.get_nowait()
count += 1
except queue.Empty:
break
logger.info(f"Cleared {count} requests from queue")
return count
def _select_proxy_with_circuit_breaker(self) -> Proxy:
"""
Select a proxy while respecting circuit breaker states.
Returns:
Selected proxy
Raises:
ProxyPoolEmptyError: If no healthy proxies available or all circuit breakers open
"""
# Check if all circuit breakers are open (FR-019)
# Take a snapshot to avoid race conditions during iteration
proxies_snapshot = self.pool.get_all_proxies()
available_proxies = []
for proxy in proxies_snapshot:
circuit_breaker = self.circuit_breakers.get(str(proxy.id))
if circuit_breaker and circuit_breaker.should_attempt_request():
available_proxies.append(proxy)
if not available_proxies:
logger.error("All circuit breakers are open - no proxies available")
raise ProxyPoolEmptyError(
"503 Service Temporarily Unavailable - All proxies are currently failing. "
"Please wait for circuit breakers to recover."
)
# Create temporary pool with available proxies
temp_pool = ProxyPool(name="temp", proxies=available_proxies)
# Select from available proxies using strategy
return self.strategy.select(temp_pool)