Source code for proxywhirl.rotator.client_pool

"""
LRU cache for httpx.Client instances with automatic eviction.

This module provides a thread-safe pool for reusing HTTP client connections,
improving performance by avoiding repeated connection setup.
"""

from __future__ import annotations

import threading
from collections import OrderedDict

import httpx
from loguru import logger


[docs] class LRUClientPool: """ LRU cache for httpx.Client instances with automatic eviction. When the pool reaches maxsize, the least recently used client is closed and removed to prevent unbounded memory growth. Supports dictionary-like access for backward compatibility with tests. """ def __init__(self, maxsize: int = 100) -> None: """ Initialize LRU client pool. Args: maxsize: Maximum number of clients to cache (default: 100) """ self._clients: OrderedDict[str, httpx.Client] = OrderedDict() self._maxsize = maxsize self._lock = threading.Lock()
[docs] def get(self, proxy_id: str) -> httpx.Client | None: """ Get a client from the pool, marking it as recently used. Args: proxy_id: Proxy ID to look up Returns: Client if found, None otherwise """ with self._lock: if proxy_id in self._clients: # Move to end (most recently used) self._clients.move_to_end(proxy_id) return self._clients[proxy_id] return None
[docs] def put(self, proxy_id: str, client: httpx.Client) -> None: """ Add a client to the pool, evicting LRU client if at capacity. Args: proxy_id: Proxy ID to store under client: Client instance to store """ with self._lock: if proxy_id in self._clients: # Already exists, move to end self._clients.move_to_end(proxy_id) else: # Check if we need to evict if len(self._clients) >= self._maxsize: # Evict least recently used (first item) lru_proxy_id, lru_client = self._clients.popitem(last=False) try: lru_client.close() logger.debug( "Evicted LRU client from pool", evicted_proxy_id=lru_proxy_id, pool_size=len(self._clients), ) except Exception as e: logger.warning( f"Error closing evicted client for proxy {lru_proxy_id}: {e}" ) # Add new client self._clients[proxy_id] = client
[docs] def remove(self, proxy_id: str) -> None: """ Remove and close a client from the pool. Args: proxy_id: Proxy ID to remove """ with self._lock: if proxy_id in self._clients: client = self._clients.pop(proxy_id) try: client.close() logger.debug("Removed client from pool", proxy_id=proxy_id) except Exception as e: logger.warning(f"Error closing client for proxy {proxy_id}: {e}")
[docs] def clear(self) -> None: """Close all clients and clear the pool.""" with self._lock: for proxy_id, client in self._clients.items(): try: client.close() logger.debug("Closed pooled client", proxy_id=proxy_id) except Exception as e: logger.warning(f"Error closing client for proxy {proxy_id}: {e}") self._clients.clear()
def __len__(self) -> int: """Return number of clients in pool.""" with self._lock: return len(self._clients) def __contains__(self, proxy_id: str) -> bool: """Check if proxy_id is in pool (supports 'in' operator).""" with self._lock: return proxy_id in self._clients def __getitem__(self, proxy_id: str) -> httpx.Client: """Get client by proxy_id (supports dict-like access for tests).""" with self._lock: return self._clients[proxy_id] def __setitem__(self, proxy_id: str, client: httpx.Client) -> None: """Set client for proxy_id (supports dict-like access for tests).""" self.put(proxy_id, client) def __delitem__(self, proxy_id: str) -> None: """Delete client for proxy_id (supports dict-like deletion).""" self.remove(proxy_id)