Source code for proxywhirl.rotator.async_

"""
Async proxy rotation implementation using httpx.AsyncClient.
"""

from __future__ import annotations

import asyncio
import threading
from collections import OrderedDict
from typing import Any
from urllib.parse import quote

import httpx
from loguru import logger

from proxywhirl.circuit_breaker import CircuitBreaker
from proxywhirl.exceptions import (
    ProxyAuthenticationError,
    ProxyConnectionError,
    ProxyPoolEmptyError,
)
from proxywhirl.models import BootstrapConfig, Proxy, ProxyConfiguration, ProxyPool
from proxywhirl.retry import RetryExecutor, RetryMetrics, RetryPolicy
from proxywhirl.rotator._bootstrap import bootstrap_pool_if_empty_async
from proxywhirl.rotator.base import ProxyRotatorBase
from proxywhirl.strategies import (
    LeastUsedStrategy,
    RandomStrategy,
    RotationStrategy,
    RoundRobinStrategy,
    WeightedStrategy,
)
from proxywhirl.utils import mask_proxy_url


def _resolve_strategy(strategy: RotationStrategy | str | None) -> RotationStrategy:
    """Resolve a strategy instance from either a strategy object or known string name."""
    if strategy is None:
        return RoundRobinStrategy()

    if not isinstance(strategy, str):
        return strategy

    from proxywhirl.strategies import (
        GeoTargetedStrategy,
        PerformanceBasedStrategy,
        SessionPersistenceStrategy,
    )

    strategy_map: dict[str, type[RotationStrategy]] = {
        "round-robin": RoundRobinStrategy,
        "round_robin": RoundRobinStrategy,
        "random": RandomStrategy,
        "weighted": WeightedStrategy,
        "least-used": LeastUsedStrategy,
        "least_used": LeastUsedStrategy,
        "performance-based": PerformanceBasedStrategy,
        "performance_based": PerformanceBasedStrategy,
        "session": SessionPersistenceStrategy,
        "session-persistence": SessionPersistenceStrategy,
        "session_persistence": SessionPersistenceStrategy,
        "geo-targeted": GeoTargetedStrategy,
        "geo_targeted": GeoTargetedStrategy,
    }
    strategy_lower = strategy.lower()
    if strategy_lower not in strategy_map:
        raise ValueError(f"Unknown strategy: {strategy}. Valid options: {', '.join(strategy_map)}")
    return strategy_map[strategy_lower]()


[docs] class LRUAsyncClientPool: """ LRU cache for httpx.AsyncClient instances with automatic eviction. When the pool reaches maxsize, the least recently used client is closed and removed to prevent unbounded memory growth. Supports dictionary-like access for backward compatibility with tests. """ def __init__(self, maxsize: int = 100) -> None: """ Initialize LRU async client pool. Args: maxsize: Maximum number of clients to cache (default: 100) """ self._clients: OrderedDict[str, httpx.AsyncClient] = OrderedDict() self._maxsize = maxsize self._lock = asyncio.Lock()
[docs] async def get(self, proxy_id: str) -> httpx.AsyncClient | None: """ Get a client from the pool, marking it as recently used. Args: proxy_id: Proxy ID to look up Returns: Client if found, None otherwise """ async with self._lock: if proxy_id in self._clients: # Move to end (most recently used) self._clients.move_to_end(proxy_id) return self._clients[proxy_id] return None
[docs] async def put(self, proxy_id: str, client: httpx.AsyncClient) -> None: """ Add a client to the pool, evicting LRU client if at capacity. Args: proxy_id: Proxy ID to store under client: Client instance to store """ async with self._lock: if proxy_id in self._clients: # Already exists, move to end self._clients.move_to_end(proxy_id) else: # Check if we need to evict if len(self._clients) >= self._maxsize: # Evict least recently used (first item) lru_proxy_id, lru_client = self._clients.popitem(last=False) try: await lru_client.aclose() logger.debug( "Evicted LRU async client from pool", evicted_proxy_id=lru_proxy_id, pool_size=len(self._clients), ) except Exception as e: logger.warning( f"Error closing evicted async client for proxy {lru_proxy_id}: {e}" ) # Add new client self._clients[proxy_id] = client
[docs] async def remove(self, proxy_id: str) -> None: """ Remove and close a client from the pool. Args: proxy_id: Proxy ID to remove """ async with self._lock: if proxy_id in self._clients: client = self._clients.pop(proxy_id) try: await client.aclose() logger.debug("Removed async client from pool", proxy_id=proxy_id) except Exception as e: logger.warning(f"Error closing async client for proxy {proxy_id}: {e}")
[docs] async def clear(self) -> None: """Close all clients and clear the pool.""" async with self._lock: for proxy_id, client in self._clients.items(): try: await client.aclose() logger.debug("Closed pooled async client", proxy_id=proxy_id) except Exception as e: logger.warning(f"Error closing async client for proxy {proxy_id}: {e}") self._clients.clear()
def __len__(self) -> int: """Return number of clients in pool.""" return len(self._clients) def __contains__(self, proxy_id: str) -> bool: """Check if proxy_id is in pool (supports 'in' operator).""" return proxy_id in self._clients async def __getitem__(self, proxy_id: str) -> httpx.AsyncClient: """Get client by proxy_id (supports dict-like access for tests).""" async with self._lock: return self._clients[proxy_id] async def __setitem__(self, proxy_id: str, client: httpx.AsyncClient) -> None: """Set client for proxy_id (supports dict-like access for tests).""" await self.put(proxy_id, client) async def __delitem__(self, proxy_id: str) -> None: """Delete client for proxy_id (supports dict-like deletion).""" await self.remove(proxy_id)
[docs] class AsyncProxyWhirl(ProxyRotatorBase): """ Async proxy rotator with automatic failover and intelligent rotation. Provides async HTTP methods (GET, POST, PUT, DELETE, PATCH) that automatically rotate through a pool of proxies, with intelligent failover on connection errors. Example:: from proxywhirl import AsyncProxyWhirl, Proxy async with AsyncProxyWhirl() as rotator: await rotator.add_proxy("http://proxy1.example.com:8080") await rotator.add_proxy("http://proxy2.example.com:8080") response = await rotator.get("https://httpbin.org/ip") print(response.json()) """ def __init__( self, proxies: list[Proxy] | None = None, strategy: RotationStrategy | str | None = None, config: ProxyConfiguration | None = None, retry_policy: RetryPolicy | None = None, bootstrap: BootstrapConfig | bool | None = None, ) -> None: """ Initialize AsyncProxyWhirl. Args: proxies: Initial list of proxies (optional) strategy: Rotation strategy instance or name string. Supported names: ``round-robin``, ``random``, ``weighted``, ``least-used``, ``performance-based``, ``session``, ``geo-targeted``. Defaults to RoundRobinStrategy. config: Configuration settings (default: ProxyConfiguration()) retry_policy: Retry policy configuration (default: RetryPolicy()) bootstrap: Bootstrap configuration for lazy proxy fetching. False disables auto-bootstrap (for manual proxy management). True or None uses default BootstrapConfig. """ self.pool = ProxyPool(name="default", proxies=proxies or []) self.strategy = _resolve_strategy(strategy) self.config = config or ProxyConfiguration() self._client: httpx.AsyncClient | None = None self._client_pool = LRUAsyncClientPool(maxsize=100) # LRU cache with max 100 clients # Coerce bool/None to BootstrapConfig if bootstrap is False: self._bootstrap_config = BootstrapConfig(enabled=False) elif isinstance(bootstrap, BootstrapConfig): self._bootstrap_config = bootstrap else: self._bootstrap_config = BootstrapConfig() self._bootstrap_lock = asyncio.Lock() self._bootstrap_attempted = False self._bootstrap_error_message: str | None = None # Retry and circuit breaker components self.retry_policy = retry_policy or RetryPolicy() self.circuit_breakers: dict[str, CircuitBreaker] = {} self.retry_metrics = RetryMetrics() self.retry_executor = RetryExecutor( self.retry_policy, self.circuit_breakers, self.retry_metrics ) # Initialize circuit breakers for existing proxies (all start CLOSED per FR-021) # Use thread-safe snapshot for initialization for proxy in self.pool.get_all_proxies(): self.circuit_breakers[str(proxy.id)] = CircuitBreaker(proxy_id=str(proxy.id)) # Note: Strategy swapping is atomic via Python's reference assignment semantics. # No explicit lock needed as self.strategy = new_strategy is a single atomic operation. # Start periodic metrics aggregation thread (every 5 minutes) self._stop_event = threading.Event() self._aggregation_thread = threading.Thread( target=self._aggregation_loop, daemon=True, name="proxywhirl-metrics-aggregation" ) self._aggregation_thread.start() # Configure logging if hasattr(logger, "_core") and not logger._core.handlers: from proxywhirl.utils import configure_logging configure_logging( level=self.config.log_level, format_type=self.config.log_format, redact_credentials=self.config.log_redact_credentials, ) async def __aenter__(self) -> AsyncProxyWhirl: """Enter async context manager.""" self._client = httpx.AsyncClient( timeout=self.config.timeout, verify=self.config.verify_ssl, follow_redirects=self.config.follow_redirects, limits=httpx.Limits( max_connections=self.config.pool_connections, max_keepalive_connections=self.config.pool_max_keepalive, ), ) return self async def __aexit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None: """Exit async context manager.""" if self._client: await self._client.aclose() self._client = None # Close all pooled clients await self._close_all_clients() # Stop aggregation thread self._stop_event.set() if self._aggregation_thread and self._aggregation_thread.is_alive(): self._aggregation_thread.join(timeout=5.0) def __del__(self) -> None: """ Destructor to ensure cleanup when not used as context manager. This method provides a safety net for resource cleanup when the object is garbage collected without proper context manager usage. It ensures that the background aggregation thread is stopped to prevent resource leaks. Note: Using AsyncProxyWhirl as a context manager (async with) is strongly recommended for proper async cleanup of httpx clients. """ if hasattr(self, "_stop_event"): self._stop_event.set() if hasattr(self, "_aggregation_thread") and self._aggregation_thread.is_alive(): self._aggregation_thread.join(timeout=1.0) logger.debug( "AsyncProxyWhirl cleanup via __del__ - context manager usage recommended", thread_name=self._aggregation_thread.name, ) async def _close_all_clients(self) -> None: """Close all pooled clients and clear the pool.""" await self._client_pool.clear()
[docs] async def add_proxy(self, proxy: Proxy | str) -> None: """ Add a proxy to the pool. Args: proxy: Proxy instance or URL string """ if isinstance(proxy, str): from proxywhirl.utils import create_proxy_from_url proxy = create_proxy_from_url(proxy) self.pool.add_proxy(proxy) # Initialize circuit breaker for new proxy (starts CLOSED per FR-021) self.circuit_breakers[str(proxy.id)] = CircuitBreaker(proxy_id=str(proxy.id)) masked_url = mask_proxy_url(proxy.url) logger.info(f"Added proxy to pool: {masked_url}", proxy_id=str(proxy.id))
[docs] async def remove_proxy(self, proxy_id: str) -> None: """ Remove a proxy from the pool. Args: proxy_id: UUID of proxy to remove """ from uuid import UUID # Close and remove the client for this proxy if it exists await self._client_pool.remove(proxy_id) # Clean up circuit breaker to prevent memory leak if proxy_id in self.circuit_breakers: del self.circuit_breakers[proxy_id] logger.debug(f"Removed circuit breaker for proxy: {proxy_id}") self.pool.remove_proxy(UUID(proxy_id)) logger.info(f"Removed proxy from pool: {proxy_id}")
async def _bootstrap_pool_if_empty(self) -> int: """Bootstrap pool from built-in sources when empty.""" return await bootstrap_pool_if_empty_async( pool=self.pool, add_proxy=self.add_proxy, config=self._bootstrap_config, ) async def _ensure_request_bootstrap(self) -> None: """Run one-time lazy bootstrap for async request path when pool starts empty.""" if self.pool.size > 0: return async with self._bootstrap_lock: if self.pool.size > 0: return if self._bootstrap_error_message is not None: raise ProxyPoolEmptyError(self._bootstrap_error_message) if self._bootstrap_attempted: return self._bootstrap_attempted = True try: await self._bootstrap_pool_if_empty() except ProxyPoolEmptyError as exc: self._bootstrap_error_message = str(exc) raise # If pool is still empty after bootstrap (e.g., bootstrap disabled) if self.pool.size == 0: if not self._bootstrap_config.enabled: msg = ( "Proxy pool is empty and auto-bootstrap is disabled. " "Call .add_proxy() to add proxies manually, or set " "bootstrap=BootstrapConfig(enabled=True)." ) else: msg = "Bootstrap completed but proxy pool is still empty." self._bootstrap_error_message = msg raise ProxyPoolEmptyError(msg)
[docs] async def get_proxy(self) -> Proxy: """ Get the next proxy from the pool using the rotation strategy. Returns: Next proxy to use Raises: ProxyPoolEmptyError: If no healthy proxies available """ return self._select_proxy_with_circuit_breaker()
[docs] def set_strategy(self, strategy: RotationStrategy | str, *, atomic: bool = True) -> None: """ Hot-swap the rotation strategy without restarting. This method implements atomic strategy swapping to ensure: - New requests immediately use the new strategy - In-flight requests complete with their original strategy - No requests are dropped during the swap - Swap completes in <100ms (SC-009) Args: strategy: New strategy instance or name string. Supported names: ``round-robin``, ``random``, ``weighted``, ``least-used``, ``performance-based``, ``session``, ``geo-targeted``. atomic: If True (default), ensures atomic swap. If False, allows immediate replacement (faster but may affect in-flight requests) Example: >>> async with AsyncProxyWhirl(strategy="round-robin") as rotator: ... # ... after some requests ... ... rotator.set_strategy("performance-based") # Hot-swap ... # New requests now use performance-based strategy Thread Safety: Thread-safe via atomic reference swap. Multiple threads can call this method safely without race conditions. Performance: Target: <100ms for hot-swap completion (SC-009) Typical: <10ms for strategy instance creation and assignment """ import time start_time = time.perf_counter() # Parse strategy string or use provided instance new_strategy = _resolve_strategy(strategy) if isinstance(strategy, str) else strategy # Store old strategy for logging old_strategy_name = self.strategy.__class__.__name__ new_strategy_name = new_strategy.__class__.__name__ # Python's reference assignment is atomic at the bytecode level, # so no explicit locking is needed for thread-safe strategy swap. # The 'atomic' parameter is kept for API compatibility but is now a no-op. self.strategy = new_strategy elapsed_ms = (time.perf_counter() - start_time) * 1000 logger.info( f"Strategy hot-swapped: {old_strategy_name} ? {new_strategy_name} " f"(completed in {elapsed_ms:.2f}ms)", old_strategy=old_strategy_name, new_strategy=new_strategy_name, swap_time_ms=elapsed_ms, ) # Validate SC-009: <100ms hot-swap time if elapsed_ms >= 100.0: logger.warning( f"Hot-swap exceeded target time: {elapsed_ms:.2f}ms (target: <100ms)", swap_time_ms=elapsed_ms, )
[docs] def get_pool_stats(self) -> dict[str, Any]: """ Get statistics about the proxy pool. Returns: dict[str, Any]: Pool statistics including total_proxies, healthy_proxies, unhealthy_proxies, dead_proxies, total_requests, total_successes, total_failures, and average_success_rate. """ from proxywhirl.models import HealthStatus # Use thread-safe snapshot for all calculations proxies_snapshot = self.pool.get_all_proxies() healthy_count = sum( 1 for p in proxies_snapshot if p.health_status in (HealthStatus.HEALTHY, HealthStatus.UNKNOWN, HealthStatus.DEGRADED) ) unhealthy_count = sum( 1 for p in proxies_snapshot if p.health_status == HealthStatus.UNHEALTHY ) dead_count = sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.DEAD) total_requests = sum(p.total_requests for p in proxies_snapshot) total_successes = sum(p.total_successes for p in proxies_snapshot) total_failures = sum(p.total_failures for p in proxies_snapshot) # Calculate average success rate success_rates = [p.success_rate for p in proxies_snapshot if p.total_requests > 0] avg_success_rate = sum(success_rates) / len(success_rates) if success_rates else 0.0 return { "total_proxies": self.pool.size, "healthy_proxies": healthy_count, "unhealthy_proxies": unhealthy_count, "dead_proxies": dead_count, "total_requests": total_requests, "total_successes": total_successes, "total_failures": total_failures, "average_success_rate": avg_success_rate, }
[docs] def get_statistics(self) -> dict[str, Any]: """ Get comprehensive statistics including source breakdown (FR-050). Returns: dict[str, Any]: All stats from get_pool_stats() plus source_breakdown mapping source names to proxy counts. """ stats = self.get_pool_stats() stats["source_breakdown"] = self.pool.get_source_breakdown() return stats
[docs] async def clear_unhealthy_proxies(self) -> int: """ Remove all unhealthy and dead proxies from the pool. Returns: Number of proxies removed """ from proxywhirl.models import HealthStatus # Capture IDs of proxies to be removed before clearing (thread-safe snapshot) removed_proxy_ids = [ str(p.id) for p in self.pool.get_all_proxies() if p.health_status in (HealthStatus.DEAD, HealthStatus.UNHEALTHY) ] removed_count = self.pool.clear_unhealthy() # Clean up circuit breakers and pooled clients for removed proxies to prevent memory leak for proxy_id in removed_proxy_ids: # Remove pooled client await self._client_pool.remove(proxy_id) # Remove circuit breaker if proxy_id in self.circuit_breakers: del self.circuit_breakers[proxy_id] logger.debug(f"Removed circuit breaker for unhealthy proxy: {proxy_id}") logger.info( "Cleared unhealthy proxies from pool", removed_count=removed_count, remaining_proxies=self.pool.size, ) return removed_count
async def _get_or_create_client( self, proxy: Proxy, proxy_dict: dict[str, str] ) -> httpx.AsyncClient: """ Get or create a pooled httpx.AsyncClient for the given proxy. This method implements connection pooling by maintaining a cache of clients per proxy. Clients are configured with connection pool settings from the configuration and are reused across multiple requests to the same proxy. The pool has a maximum size limit (default: 100). When the limit is reached, the least recently used client is evicted to prevent unbounded memory growth. Args: proxy: Proxy instance to get/create client for proxy_dict: Proxy dictionary for httpx configuration Returns: Configured httpx.AsyncClient instance with connection pooling """ proxy_id = str(proxy.id) # Try to get existing client from LRU pool existing_client = await self._client_pool.get(proxy_id) if existing_client is not None: return existing_client # Create new client with connection pooling settings client = httpx.AsyncClient( proxy=proxy_dict.get("http://"), timeout=self.config.timeout, verify=self.config.verify_ssl, follow_redirects=self.config.follow_redirects, limits=httpx.Limits( max_connections=self.config.pool_connections, max_keepalive_connections=self.config.pool_max_keepalive, ), ) # Store in LRU pool (automatically evicts LRU if at capacity) await self._client_pool.put(proxy_id, client) logger.debug( "Created new pooled async client for proxy", proxy_id=proxy_id, pool_connections=self.config.pool_connections, pool_max_keepalive=self.config.pool_max_keepalive, client_pool_size=len(self._client_pool), ) return client def _get_proxy_dict(self, proxy: Proxy) -> dict[str, str]: """ Convert proxy to httpx proxy dict format. Args: proxy: Proxy to convert Returns: dict[str, str]: Proxy URL for httpx with "http://" and "https://" keys. """ url = str(proxy.url) # Add credentials to URL if present if proxy.username and proxy.password: username = proxy.username.get_secret_value() password = proxy.password.get_secret_value() # URL-encode credentials to handle special characters like @, :, /, etc. # Using safe='' ensures all special characters are encoded username_encoded = quote(username, safe="") password_encoded = quote(password, safe="") # Insert URL-encoded credentials into URL if "://" in url: protocol, rest = url.split("://", 1) url = f"{protocol}://{username_encoded}:{password_encoded}@{rest}" # Return proxy dict for all protocols return { "http://": url, "https://": url, } async def _make_request( self, method: str, url: str, retry_policy: RetryPolicy | None = None, **kwargs: Any, ) -> httpx.Response: """ Make async HTTP request with automatic proxy rotation, retry, and circuit breakers. Args: method: HTTP method url: URL to request retry_policy: Optional per-request retry policy override **kwargs: Additional request parameters Returns: HTTP response Raises: ProxyPoolEmptyError: If no healthy proxies available ProxyConnectionError: If all retry attempts fail """ if self.pool.size == 0: await self._ensure_request_bootstrap() # Select proxy with circuit breaker filtering try: proxy = self._select_proxy_with_circuit_breaker() except ProxyPoolEmptyError: logger.error("No healthy proxies available or all circuit breakers open") raise proxy_dict = self._get_proxy_dict(proxy) masked_url = mask_proxy_url(str(proxy.url)) logger.info( f"Making {method} request to {url}", proxy_id=str(proxy.id), proxy_url=masked_url ) # Get or create pooled client for this proxy client = await self._get_or_create_client(proxy, proxy_dict) # Define async request function for retry executor async def request_fn() -> httpx.Response: # Use pooled client (no context manager - client is reused) response = await client.request(method, url, **kwargs) # Check for authentication errors (401 Unauthorized, 407 Proxy Auth Required) if response.status_code in (401, 407): logger.error( f"Proxy authentication failed: {masked_url}", proxy_id=str(proxy.id), status_code=response.status_code, ) auth_message = ( "authentication required" if response.status_code == 407 else "authentication failed" ) raise ProxyAuthenticationError( f"Proxy {auth_message} ({response.status_code}) for {masked_url}. " "Please provide valid credentials (username and password)." ) return response # Execute with retry - note that retry_executor.execute_with_retry is sync # We'll need to handle the async nature here try: # Since the retry executor is sync, we need to adapt it for async # For now, we'll call the async request_fn directly with basic retry logic response = await self._execute_async_with_retry( request_fn, proxy, method, url, retry_policy ) # Record success in strategy self.strategy.record_result(proxy, success=True, response_time_ms=0.0) logger.info( f"Request successful: {method} {url}", proxy_id=str(proxy.id), status_code=response.status_code, ) return response except ProxyAuthenticationError as e: self.strategy.record_result(proxy, success=False, response_time_ms=0.0) logger.error( f"Authentication error for proxy {proxy.id}", proxy_id=str(proxy.id), error=str(e), ) # Re-raise auth errors without wrapping raise except Exception as e: # Record failure in strategy self.strategy.record_result(proxy, success=False, response_time_ms=0.0) logger.warning( f"Request failed after retries: {method} {url}", proxy_id=str(proxy.id), error=str(e), ) raise ProxyConnectionError(f"Request failed: {e}") from e async def _execute_async_with_retry( self, request_fn: Any, proxy: Proxy, method: str, url: str, retry_policy: RetryPolicy | None = None, ) -> httpx.Response: """ Execute async request with retry logic (ASYNCHRONOUS). This is the async counterpart to RetryExecutor.execute_with_retry. It uses asyncio.sleep for non-blocking delays instead of time.sleep. Args: request_fn: Async function to execute proxy: Proxy being used method: HTTP method url: Target URL retry_policy: Optional retry policy override Returns: HTTP response Raises: Exception: If all retry attempts fail """ policy = retry_policy or self.retry_policy circuit_breaker = self.circuit_breakers.get(str(proxy.id)) if circuit_breaker and not circuit_breaker.should_attempt_request(): raise ProxyConnectionError("Circuit breaker is open for this proxy") last_exception = None previous_delay: float | None = None # Track for decorrelated jitter for attempt in range(policy.max_attempts): try: response = await request_fn() # Record success in circuit breaker if circuit_breaker: circuit_breaker.record_success() return response except ProxyAuthenticationError: # Authentication errors are non-retryable. if circuit_breaker: circuit_breaker.record_failure() raise except Exception as e: last_exception = e # Record failure in circuit breaker if circuit_breaker: circuit_breaker.record_failure() # Check if we should retry if attempt < policy.max_attempts - 1: # Calculate backoff delay using decorrelated jitter delay = policy.calculate_delay(attempt, previous_delay=previous_delay) logger.debug( f"Retry attempt {attempt + 1}/{policy.max_attempts} after {delay}s", proxy_id=str(proxy.id), ) # ASYNC sleep - non-blocking delay for event loop await asyncio.sleep(delay) previous_delay = delay # Track for decorrelated jitter else: break # All retries exhausted raise last_exception if last_exception else ProxyConnectionError("Request failed")
[docs] async def get(self, url: str, **kwargs: Any) -> httpx.Response: """Make async GET request.""" return await self._make_request("GET", url, **kwargs)
[docs] async def post(self, url: str, **kwargs: Any) -> httpx.Response: """Make async POST request.""" return await self._make_request("POST", url, **kwargs)
[docs] async def put(self, url: str, **kwargs: Any) -> httpx.Response: """Make async PUT request.""" return await self._make_request("PUT", url, **kwargs)
[docs] async def delete(self, url: str, **kwargs: Any) -> httpx.Response: """Make async DELETE request.""" return await self._make_request("DELETE", url, **kwargs)
[docs] async def patch(self, url: str, **kwargs: Any) -> httpx.Response: """Make async PATCH request.""" return await self._make_request("PATCH", url, **kwargs)
[docs] async def head(self, url: str, **kwargs: Any) -> httpx.Response: """Make async HEAD request.""" return await self._make_request("HEAD", url, **kwargs)
[docs] async def options(self, url: str, **kwargs: Any) -> httpx.Response: """Make async OPTIONS request.""" return await self._make_request("OPTIONS", url, **kwargs)
def _aggregation_loop(self) -> None: """ Persistent thread loop for periodic metrics aggregation. Runs every 5 minutes until stop event is set. Uses threading.Event.wait() for interruptible sleep. """ while not self._stop_event.wait(timeout=300.0): # 5 minutes try: self.retry_metrics.aggregate_hourly() except Exception as e: logger.warning(f"Metrics aggregation failed: {e}")
[docs] def get_circuit_breaker_states(self) -> dict[str, CircuitBreaker]: """ Get circuit breaker states for all proxies. Returns: dict[str, CircuitBreaker]: Mapping of proxy IDs to their circuit breaker instances. """ return self.circuit_breakers.copy()
[docs] def reset_circuit_breaker(self, proxy_id: str) -> None: """ Manually reset a circuit breaker to CLOSED state. Args: proxy_id: ID of the proxy whose circuit breaker to reset Raises: KeyError: If proxy_id not found """ if proxy_id not in self.circuit_breakers: raise KeyError(f"No circuit breaker found for proxy {proxy_id}") self.circuit_breakers[proxy_id].reset() logger.info(f"Circuit breaker manually reset for proxy {proxy_id}")
[docs] def get_retry_metrics(self) -> RetryMetrics: """ Get retry metrics. Returns: RetryMetrics instance with current metrics """ return self.retry_metrics