Source code for proxywhirl.strategies.core

"""
Rotation strategies for proxy selection.
"""

from __future__ import annotations

import random
import threading
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Protocol, runtime_checkable
from uuid import UUID

from proxywhirl.exceptions import ProxyPoolEmptyError
from proxywhirl.models import (
    HealthStatus,
    Proxy,
    ProxyPool,
    SelectionContext,
    Session,
    StrategyConfig,
)

# ============================================================================
# STRATEGY STATE MANAGEMENT
# ============================================================================


@dataclass
[docs] class ProxyMetrics: """Per-proxy mutable metrics maintained by a strategy. This class encapsulates performance metrics that a strategy tracks for each proxy. By storing these separately from the Proxy model, strategies can maintain independent metric state with their own configuration (e.g., EMA alpha values). Attributes: ema_response_time_ms: Exponential moving average of response times total_requests: Count of requests made through this proxy total_successes: Count of successful requests total_failures: Count of failed requests last_response_time_ms: Most recent response time window_start: Start time of the current sliding window """
[docs] ema_response_time_ms: float | None = None
[docs] total_requests: int = 0
[docs] total_successes: int = 0
[docs] total_failures: int = 0
[docs] last_response_time_ms: float | None = None
[docs] window_start: datetime | None = None
@property
[docs] def success_rate(self) -> float: """Calculate success rate for this proxy in this strategy's state.""" if self.total_requests == 0: return 0.0 return max(0.0, min(1.0, self.total_successes / self.total_requests))
[docs] def update_ema(self, response_time_ms: float, alpha: float) -> None: """Update EMA with new response time. Args: response_time_ms: Response time in milliseconds alpha: EMA smoothing factor (0-1) """ if self.ema_response_time_ms is None: self.ema_response_time_ms = response_time_ms else: self.ema_response_time_ms = ( alpha * response_time_ms + (1 - alpha) * self.ema_response_time_ms ) self.last_response_time_ms = response_time_ms
@dataclass
[docs] class StrategyState: """Per-strategy mutable state for managing proxy metrics. This class separates mutable strategy state from immutable proxy identity. Each strategy instance maintains its own StrategyState, which tracks per-proxy metrics independently. This allows different strategies to: 1. Use different EMA alpha values without conflicts 2. Track proxy performance independently 3. Maintain consistent metrics across strategy reconfiguration The state is keyed by proxy UUID to ensure stable identity even if proxy objects are recreated. Example: >>> state = StrategyState(ema_alpha=0.3) >>> state.record_success(proxy.id, response_time_ms=150.0) >>> metrics = state.get_metrics(proxy.id) >>> print(metrics.ema_response_time_ms) # 150.0 Thread Safety: Uses threading.Lock to protect all state mutations. Attributes: ema_alpha: EMA smoothing factor for this strategy's metrics window_duration_seconds: Duration of sliding window for counter resets """
[docs] ema_alpha: float = 0.2
[docs] window_duration_seconds: int = 3600
_metrics: dict[UUID, ProxyMetrics] = field(default_factory=dict) _lock: threading.Lock = field(default_factory=threading.Lock)
[docs] def get_metrics(self, proxy_id: UUID) -> ProxyMetrics: """Get or create metrics for a proxy. Args: proxy_id: UUID of the proxy Returns: ProxyMetrics instance for this proxy """ with self._lock: if proxy_id not in self._metrics: self._metrics[proxy_id] = ProxyMetrics() return self._metrics[proxy_id]
[docs] def record_success(self, proxy_id: UUID, response_time_ms: float) -> None: """Record a successful request. Args: proxy_id: UUID of the proxy response_time_ms: Response time in milliseconds """ with self._lock: metrics = self._metrics.setdefault(proxy_id, ProxyMetrics()) metrics.total_requests += 1 metrics.total_successes += 1 metrics.update_ema(response_time_ms, self.ema_alpha)
[docs] def record_failure(self, proxy_id: UUID) -> None: """Record a failed request. Args: proxy_id: UUID of the proxy """ with self._lock: metrics = self._metrics.setdefault(proxy_id, ProxyMetrics()) metrics.total_requests += 1 metrics.total_failures += 1
[docs] def get_ema_response_time(self, proxy_id: UUID) -> float | None: """Get EMA response time for a proxy. Args: proxy_id: UUID of the proxy Returns: EMA response time in ms, or None if no data """ with self._lock: if proxy_id not in self._metrics: return None return self._metrics[proxy_id].ema_response_time_ms
[docs] def get_success_rate(self, proxy_id: UUID) -> float: """Get success rate for a proxy. Args: proxy_id: UUID of the proxy Returns: Success rate (0.0-1.0), or 0.0 if no data """ with self._lock: if proxy_id not in self._metrics: return 0.0 return self._metrics[proxy_id].success_rate
[docs] def get_request_count(self, proxy_id: UUID) -> int: """Get total request count for a proxy. Args: proxy_id: UUID of the proxy Returns: Total number of requests, or 0 if no data """ with self._lock: if proxy_id not in self._metrics: return 0 return self._metrics[proxy_id].total_requests
[docs] def reset_metrics(self, proxy_id: UUID) -> None: """Reset metrics for a proxy. Args: proxy_id: UUID of the proxy """ with self._lock: if proxy_id in self._metrics: self._metrics[proxy_id] = ProxyMetrics()
[docs] def clear_all(self) -> None: """Clear all tracked metrics.""" with self._lock: self._metrics.clear()
[docs] class StrategyRegistry: """ Singleton registry for custom rotation strategies. Allows registration and retrieval of custom strategy implementations, enabling plugin architecture for ProxyWhirl. Example: >>> from proxywhirl.strategies import StrategyRegistry >>> >>> # Create custom strategy >>> class MyStrategy: ... def select(self, pool): ... return pool.get_all_proxies()[0] ... def record_result(self, proxy, success, response_time_ms): ... pass >>> >>> # Register it >>> registry = StrategyRegistry() >>> registry.register_strategy("my-strategy", MyStrategy) >>> >>> # Retrieve and use >>> strategy_class = registry.get_strategy("my-strategy") >>> strategy = strategy_class() Thread Safety: Thread-safe singleton implementation using double-checked locking. Performance: Registration: O(1) Retrieval: O(1) Validation: <1ms per strategy (SC-010) """ _instance: StrategyRegistry | None = None _lock = threading.RLock() def __init__(self) -> None: """Initialize the registry (called once by __new__).""" # These are initialized in __new__ for singleton pattern if not hasattr(self, "_strategies"): self._strategies: dict[str, type] = {} if not hasattr(self, "_registry_lock"): self._registry_lock = threading.RLock() def __new__(cls) -> StrategyRegistry: """Ensure singleton pattern with thread-safe double-checked locking.""" if cls._instance is None: with cls._lock: if cls._instance is None: instance = super().__new__(cls) instance._strategies = {} instance._registry_lock = threading.RLock() cls._instance = instance return cls._instance
[docs] def register_strategy(self, name: str, strategy_class: type, *, validate: bool = True) -> None: """ Register a custom strategy. Args: name: Unique name for the strategy (e.g., "my-custom-strategy") strategy_class: Strategy class implementing RotationStrategy protocol validate: If True (default), validates strategy implements required methods Raises: ValueError: If strategy name already registered (unless re-registering) TypeError: If strategy doesn't implement required protocol methods Example: >>> class FastStrategy: ... def select(self, pool): ... return pool.get_all_proxies()[0] ... def record_result(self, proxy, success, response_time_ms): ... pass >>> >>> registry = StrategyRegistry() >>> registry.register_strategy("fast", FastStrategy) """ import time start_time = time.perf_counter() if validate: self._validate_strategy(strategy_class) with self._registry_lock: # Allow re-registration (replacement) if name in self._strategies: from loguru import logger logger.warning( f"Replacing existing strategy registration: {name}", old_class=self._strategies[name].__name__, new_class=strategy_class.__name__, ) self._strategies[name] = strategy_class load_time = (time.perf_counter() - start_time) * 1000 from loguru import logger logger.info( f"Registered strategy: {name} ({strategy_class.__name__}) in {load_time:.2f}ms", strategy_name=name, strategy_class=strategy_class.__name__, load_time_ms=load_time, ) # Validate SC-010: <1 second load time if load_time >= 1000.0: logger.warning( f"Strategy registration exceeded target time: {load_time:.2f}ms (target: <1000ms)", strategy_name=name, load_time_ms=load_time, )
[docs] def get_strategy(self, name: str) -> type: """ Retrieve a registered strategy class. Args: name: Strategy name used during registration Returns: Strategy class (not instance - caller must instantiate) Raises: KeyError: If strategy name not found in registry Example: >>> registry = StrategyRegistry() >>> strategy_class = registry.get_strategy("my-strategy") >>> strategy = strategy_class() # Instantiate """ with self._registry_lock: if name not in self._strategies: available = ", ".join(self._strategies.keys()) if self._strategies else "none" raise KeyError( f"Strategy '{name}' not found in registry. Available strategies: {available}" ) return self._strategies[name]
[docs] def list_strategies(self) -> list[str]: """ List all registered strategy names. Returns: List of registered strategy names """ with self._registry_lock: return list(self._strategies.keys())
[docs] def unregister_strategy(self, name: str) -> None: """ Remove a strategy from the registry. Args: name: Strategy name to unregister Raises: KeyError: If strategy name not found """ with self._registry_lock: if name not in self._strategies: raise KeyError(f"Strategy '{name}' not found in registry") del self._strategies[name] from loguru import logger logger.info(f"Unregistered strategy: {name}", strategy_name=name)
def _validate_strategy(self, strategy_class: type) -> None: """ Validate that strategy class implements required protocol. Args: strategy_class: Strategy class to validate Raises: TypeError: If required methods are missing """ required_methods = ["select", "record_result"] missing_methods = [] for method in required_methods: if not hasattr(strategy_class, method): missing_methods.append(method) if missing_methods: raise TypeError( f"Strategy {strategy_class.__name__} missing required methods: " f"{', '.join(missing_methods)}. " f"Must implement RotationStrategy protocol." ) @classmethod
[docs] def reset(cls) -> None: """ Reset the singleton instance (useful for testing). Warning: This should only be used in tests. Calling this in production will clear all registered strategies. """ with cls._lock: cls._instance = None
@runtime_checkable
[docs] class RotationStrategy(Protocol): """Protocol defining interface for proxy rotation strategies."""
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy from the pool based on strategy logic. Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Selected proxy Raises: ProxyPoolEmptyError: If no suitable proxy is available """ ...
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of using a proxy. Args: proxy: The proxy that was used success: Whether the request succeeded response_time_ms: Response time in milliseconds """ ...
[docs] class RoundRobinStrategy: """ Round-robin proxy selection strategy with SelectionContext support. Selects proxies in sequential order, wrapping around to the first proxy after reaching the end of the list. Only selects healthy proxies. Supports filtering based on SelectionContext (e.g., failed_proxy_ids). Thread Safety: Uses threading.Lock to protect _current_index access, ensuring atomic index increment and preventing proxy skipping or duplicate selection in multi-threaded environments. """ def __init__(self) -> None: """Initialize round-robin strategy.""" self._current_index: int = 0 self._lock = threading.Lock() self.config: StrategyConfig | None = None
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select next proxy in round-robin order. Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Next healthy proxy in rotation Raises: ProxyPoolEmptyError: If no healthy proxies are available """ healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available in pool") # Filter out failed proxies if context provided if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) # Select proxy at current index (with wraparound) - thread-safe with self._lock: index = self._current_index % len(healthy_proxies) self._current_index = (self._current_index + 1) % len(healthy_proxies) proxy = healthy_proxies[index] # Update proxy metadata to track request start proxy.start_request() return proxy
[docs] def configure(self, config: StrategyConfig) -> None: """ Configure the strategy with custom settings. Args: config: Strategy configuration object """ self.config = config
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that pool has required metadata for this strategy. Round-robin doesn't require any special metadata, so always returns True. Args: pool: The proxy pool to validate Returns: Always True for round-robin """ return True
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of using a proxy. Updates proxy statistics based on request outcome and completes the request tracking. Args: proxy: The proxy that was used success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class RandomStrategy: """ Random proxy selection strategy with SelectionContext support. Randomly selects a proxy from the pool of healthy proxies. Provides unpredictable rotation for scenarios where sequential patterns should be avoided. Thread Safety: Uses Python's random module which is thread-safe via GIL-protected random number generation. No additional locking required. """ def __init__(self) -> None: """Initialize random strategy.""" self.config: StrategyConfig | None = None
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a random healthy proxy. Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Randomly selected healthy proxy Raises: ProxyPoolEmptyError: If no healthy proxies are available """ healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available in pool") # Filter out failed proxies if context provided if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) proxy = random.choice(healthy_proxies) # Update proxy metadata to track request start proxy.start_request() return proxy
[docs] def configure(self, config: StrategyConfig) -> None: """Configure the strategy with custom settings.""" self.config = config
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """Random selection doesn't require metadata validation.""" return True
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """Record the result of using a proxy.""" # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class WeightedStrategy: """ Weighted proxy selection strategy with SelectionContext support. Selects proxies based on custom weights or success rates. When custom weights are provided via StrategyConfig, they take precedence. Otherwise, weights are derived from success_rate. Uses weighted random selection to favor higher-performing proxies while still giving all proxies a chance. Supports: - Custom weights via StrategyConfig.weights (proxy URL -> weight mapping) - Fallback to success_rate-based weights - Minimum weight (0.1) to ensure all proxies have selection chance - SelectionContext for filtering (e.g., failed_proxy_ids) - Weight caching to avoid O(n) recalculation on every selection Thread Safety: Uses threading.Lock to protect weight cache access, ensuring atomic cache validation and update operations. Prevents race conditions where multiple threads could trigger duplicate weight recalculations or inconsistent cache states. """ def __init__(self) -> None: """Initialize weighted strategy.""" self.config: StrategyConfig | None = None self._cached_weights: list[float] | None = None self._cached_proxy_ids: list[str] | None = None self._cache_valid: bool = False self._cache_lock = threading.Lock() def _invalidate_cache(self) -> None: """ Invalidate the weight cache, forcing recalculation on next selection. Thread-safe: Acquires cache lock to ensure atomicity of invalidation. """ with self._cache_lock: self._cache_valid = False self._cached_weights = None self._cached_proxy_ids = None def _calculate_weights(self, proxies: list[Proxy]) -> list[float]: """ Calculate weights for the given proxies. Weights are always normalized to sum to 1.0 to maintain the invariant that total weight sum equals 1.0, even when proxies are removed. Args: proxies: List of proxies to calculate weights for Returns: List of normalized weights corresponding to each proxy (sum = 1.0) """ weights = [] for proxy in proxies: if self.config and self.config.weights: # Use custom weights from config custom_weight = self.config.weights.get(proxy.url, None) if custom_weight is not None and custom_weight > 0: weights.append(custom_weight) else: # Fallback to success rate or minimum weight weights.append(max(proxy.success_rate, 0.1)) else: # Use success rates as weights # Add small base weight (0.1) to give all proxies a chance weights.append(max(proxy.success_rate, 0.1)) # Handle edge case: all weights are zero/negative (shouldn't happen with max(0.1)) if all(w <= 0 for w in weights): # Fallback to uniform weights weights = [1.0] * len(weights) # Normalize weights to sum to 1.0 # This ensures the invariant is maintained even when proxies are removed total_weight = sum(weights) if total_weight > 0: weights = [w / total_weight for w in weights] else: # Fallback to uniform normalized weights weights = [1.0 / len(weights)] * len(weights) return weights def _get_weights(self, proxies: list[Proxy]) -> list[float]: """ Get weights for proxies, using cache if valid. Thread-safe: Acquires cache lock to ensure atomic cache check and update. Prevents race conditions where multiple threads could trigger duplicate weight recalculations. Args: proxies: List of proxies to get weights for Returns: List of weights corresponding to each proxy """ # Generate proxy ID list for cache validation current_proxy_ids = [str(proxy.id) for proxy in proxies] # Protect cache access with lock with self._cache_lock: # Check if cache is valid if self._cache_valid and self._cached_proxy_ids == current_proxy_ids: # Cache hit - return cached weights return self._cached_weights # type: ignore[return-value] # Cache miss - recalculate weights weights = self._calculate_weights(proxies) # Update cache atomically self._cached_weights = weights self._cached_proxy_ids = current_proxy_ids self._cache_valid = True return weights
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy weighted by custom weights or success rate. Uses cached weights when possible to avoid O(n) recalculation on every call. Cache is invalidated when the proxy set changes (different IDs). Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Weighted-random selected healthy proxy Raises: ProxyPoolEmptyError: If no healthy proxies are available """ healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available in pool") # Filter out failed proxies if context provided if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) # Get weights (from cache if valid, otherwise recalculate) weights = self._get_weights(healthy_proxies) # Use random.choices for weighted selection selected = random.choices(healthy_proxies, weights=weights, k=1)[0] # Update proxy metadata to track request start selected.start_request() return selected
[docs] def configure(self, config: StrategyConfig) -> None: """ Configure the strategy with custom settings. Invalidates the weight cache since configuration changes may affect weights. Args: config: Strategy configuration object with optional custom weights """ self.config = config # Invalidate cache when configuration changes self._invalidate_cache()
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that pool has required metadata for weighted selection. Weighted strategy can work with success_rate (always available) or custom weights. Args: pool: The proxy pool to validate Returns: Always True as success_rate is always available """ return True
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of using a proxy. Updates proxy statistics based on request outcome and invalidates the weight cache since success rates may have changed. Thread-safe: Uses double-checked locking pattern to ensure atomic invalidation and update. This prevents race conditions where another thread could select using stale weights while proxy stats are being updated. The lock ensures: 1. No thread can read cached weights between invalidation and stat update 2. Proxy stat updates are atomic with cache invalidation 3. Multiple concurrent record_result() calls don't interfere Args: proxy: The proxy that was used success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Double-checked locking: ensure atomic cache invalidation + proxy update # First check without lock (fast path for most calls) needs_invalidation = self._cache_valid if needs_invalidation: # Acquire lock for atomic invalidation + update with self._cache_lock: # Re-check under lock (double-checked locking) if self._cache_valid: # Invalidate cache atomically self._cache_valid = False self._cached_weights = None self._cached_proxy_ids = None # Update proxy stats while holding lock # This ensures no thread can see stale cache between invalidation and update alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request( success=success, response_time_ms=response_time_ms, alpha=alpha ) else: # Cache already invalid, just update proxy stats alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class LeastUsedStrategy: """ Least-used proxy selection strategy with SelectionContext support. Selects the proxy with the fewest started requests, helping to balance load across all available proxies. Uses min-heap for efficient O(log n) selection. Performance: - O(log n) selection using min-heap - O(n) heap rebuild when pool composition changes - Lazy heap invalidation for optimal performance Thread Safety: Uses threading.Lock to ensure atomic select-and-mark operations, preventing TOCTOU race conditions where multiple threads could select the same "least used" proxy simultaneously. Implementation: Uses a min-heap with lazy invalidation. The heap is rebuilt when: 1. Pool composition changes (detected via proxy ID set) 2. Heap becomes empty after filtering The heap stores tuples of (requests_started, proxy_id, proxy) for efficient comparison and retrieval. """ def __init__(self) -> None: """Initialize least-used strategy.""" self.config: StrategyConfig | None = None self._lock = threading.Lock() self._heap: list[tuple[int, str, Proxy]] = [] self._proxy_id_set: set[str] = set() # Track pool composition def _rebuild_heap(self, proxies: list[Proxy]) -> None: """ Rebuild the min-heap with current proxy states. Args: proxies: List of proxies to build heap from """ import heapq self._heap = [(proxy.requests_started, str(proxy.id), proxy) for proxy in proxies] heapq.heapify(self._heap) self._proxy_id_set = {str(proxy.id) for proxy in proxies} def _needs_rebuild(self, current_proxy_ids: set[str]) -> bool: """ Check if heap needs rebuilding due to pool composition change. Args: current_proxy_ids: Set of current proxy IDs in filtered pool Returns: True if heap needs rebuilding """ return self._proxy_id_set != current_proxy_ids or not self._heap
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select the least-used healthy proxy using min-heap. Uses min-heap for O(log n) selection. The heap is lazily rebuilt when pool composition changes, providing optimal performance for stable pools. The selection and usage marking are performed atomically under a lock to prevent TOCTOU race conditions where multiple threads could select the same "least used" proxy simultaneously. Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Healthy proxy with fewest started requests Raises: ProxyPoolEmptyError: If no healthy proxies are available """ import heapq # Atomic select-and-mark to prevent TOCTOU race condition with self._lock: healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available in pool") # Filter out failed proxies if context provided if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) # Check if heap needs rebuilding current_proxy_ids = {str(p.id) for p in healthy_proxies} if self._needs_rebuild(current_proxy_ids): self._rebuild_heap(healthy_proxies) # Extract minimum from heap (O(log n)) # The heap may contain stale entries (old requests_started values) # so we keep popping until we find a valid proxy from our filtered set while self._heap: requests_started, proxy_id, proxy = heapq.heappop(self._heap) # Verify proxy is still in our filtered set if proxy_id in current_proxy_ids: # Re-push other proxies back onto heap # Note: This proxy's requests_started will be incremented, # so we rebuild heap on next call when composition changes # Update proxy metadata to track request start (atomic with selection) proxy.start_request() # Invalidate heap since we modified a proxy's state # This forces rebuild on next selection self._proxy_id_set = set() return proxy # If we get here, heap was empty after filtering - rebuild and retry self._rebuild_heap(healthy_proxies) if not self._heap: raise ProxyPoolEmptyError("No healthy proxies available") # Extract minimum from rebuilt heap requests_started, proxy_id, proxy = heapq.heappop(self._heap) # Update proxy metadata to track request start (atomic with selection) proxy.start_request() # Invalidate heap since we modified a proxy's state self._proxy_id_set = set() return proxy
[docs] def configure(self, config: StrategyConfig) -> None: """Configure the strategy with custom settings.""" self.config = config
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that proxies have request tracking metadata. Args: pool: The proxy pool to validate Returns: True if all proxies have requests_started field """ # All Proxy objects now have requests_started field by default return True
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """Record the result of using a proxy.""" # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class PerformanceBasedStrategy: """ Performance-based proxy selection using EMA response times. Selects proxies using weighted random selection based on inverse EMA response times - faster proxies (lower EMA) get higher weights. This adaptively favors better-performing proxies while still giving all proxies a chance to be selected. Cold Start Handling: New proxies without performance data are given exploration trials (default: 3-5 trials) before being deprioritized. This ensures new proxies can build up performance data and prevents proxy starvation. Thread Safety: Uses Python's random.choices() which is thread-safe via GIL-protected random number generation. No additional locking required. """ def __init__(self, exploration_count: int = 5) -> None: """Initialize performance-based strategy. Args: exploration_count: Minimum trials for new proxies before performance-based selection applies. Default is 5 trials. Set to 0 to disable exploration. """ self.config: StrategyConfig | None = None self.exploration_count = exploration_count
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy weighted by inverse EMA response time. Faster proxies (lower EMA) receive higher weights for selection. New proxies with insufficient trials (< exploration_count) are given priority to ensure they can build performance data. Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Performance-weighted selected healthy proxy with EMA data Raises: ProxyPoolEmptyError: If no healthy proxies are available """ healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available in pool") # Filter out failed proxies if context provided if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) # Separate proxies into exploration and exploitation groups exploration_proxies = [] # New proxies needing exploration trials exploitation_proxies = [] # Proxies with sufficient performance data for p in healthy_proxies: # Proxies with insufficient trials need exploration if p.total_requests < self.exploration_count: exploration_proxies.append(p) # Only include proxies with valid EMA data in exploitation elif p.ema_response_time_ms is not None and p.ema_response_time_ms > 0: exploitation_proxies.append(p) # Prioritize exploration: if we have new proxies, select from them first if exploration_proxies: # Use random selection for exploration (equal opportunity) selected = random.choice(exploration_proxies) elif exploitation_proxies: # Use performance-based selection for exploitation # Calculate inverse weights (lower EMA = higher weight) weights = [1.0 / p.ema_response_time_ms for p in exploitation_proxies] # type: ignore[operator] selected = random.choices(exploitation_proxies, weights=weights, k=1)[0] else: # No proxies with EMA data and no exploration candidates # This can happen if all proxies have been tried but none have EMA yet # (e.g., all requests failed). Fall back to random selection. selected = random.choice(healthy_proxies) # Track request start selected.start_request() return selected
[docs] def configure(self, config: StrategyConfig) -> None: """Configure the strategy with custom settings. Args: config: Strategy configuration with optional exploration_count """ self.config = config # Allow configuration of exploration_count if config.exploration_count is not None: self.exploration_count = config.exploration_count
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that pool is usable for performance-based selection. With exploration support, we only need at least one healthy proxy. Returns True if pool has healthy proxies (exploration will handle cold start). Returns: True if pool has at least one healthy proxy """ healthy_proxies = pool.get_healthy_proxies() # With exploration, we can work with any healthy proxies # No longer require EMA data to be present return len(healthy_proxies) > 0
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of using a proxy. The EMA is updated using the strategy's configured alpha value, ensuring consistent metric calculations regardless of proxy state. Args: proxy: The proxy that was used success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
# ============================================================================ # SESSION MANAGEMENT # ============================================================================
[docs] class SessionManager: """Thread-safe session manager for sticky proxy assignments. Manages the mapping between session IDs and their assigned proxies, with automatic expiration and cleanup. All operations are thread-safe. Features: - Automatic TTL-based expiration - LRU eviction when max_sessions limit is reached - Periodic cleanup of expired sessions """ def __init__(self, max_sessions: int = 10000, auto_cleanup_threshold: int = 100) -> None: """Initialize the session manager. Args: max_sessions: Maximum number of active sessions (default: 10000) auto_cleanup_threshold: Trigger cleanup after this many operations (default: 100) """ self._sessions: OrderedDict[str, Session] = OrderedDict() self._lock = threading.RLock() self._max_sessions = max_sessions self._auto_cleanup_threshold = auto_cleanup_threshold self._operation_counter = 0
[docs] def create_session( self, session_id: str, proxy: Proxy, timeout_seconds: int = 300, ) -> Session: """Create or update a session assignment. Args: session_id: Unique identifier for the session proxy: Proxy to assign to this session timeout_seconds: Session TTL in seconds (default 5 minutes) Returns: The created/updated Session object """ with self._lock: # Auto-cleanup check self._maybe_auto_cleanup() # Check if we need to evict old sessions (LRU) if len(self._sessions) >= self._max_sessions: self._evict_lru_session() now = datetime.now(timezone.utc) expires_at = now + timedelta(seconds=timeout_seconds) session = Session( session_id=session_id, proxy_id=str(proxy.id), created_at=now, expires_at=expires_at, last_used_at=now, request_count=0, ) self._sessions[session_id] = session return session
[docs] def get_session(self, session_id: str) -> Session | None: """Get an active session by ID. Args: session_id: The session ID to look up Returns: Session object if found and not expired, None otherwise """ with self._lock: session = self._sessions.get(session_id) if session is None: return None # Check if expired if session.is_expired(): del self._sessions[session_id] return None # Mark as recently used (move to end of OrderedDict) self._sessions.move_to_end(session_id) return session
[docs] def touch_session(self, session_id: str) -> bool: """Update session last_used_at and increment request_count. Args: session_id: The session ID to touch Returns: True if session was touched, False if not found or expired """ with self._lock: session = self.get_session(session_id) if session is None: return False session.touch() return True
[docs] def remove_session(self, session_id: str) -> bool: """Remove a session from the manager. Args: session_id: The session ID to remove Returns: True if session was removed, False if not found """ with self._lock: if session_id in self._sessions: del self._sessions[session_id] return True return False
[docs] def cleanup_expired(self) -> int: """Remove all expired sessions. Returns: Number of expired sessions removed """ with self._lock: expired_ids = [sid for sid, session in self._sessions.items() if session.is_expired()] for sid in expired_ids: del self._sessions[sid] return len(expired_ids)
[docs] def get_all_sessions(self) -> list[Session]: """Get all active (non-expired) sessions. Returns: List of active Session objects """ with self._lock: # Filter out expired sessions active = [] expired_ids = [] for sid, session in self._sessions.items(): if session.is_expired(): expired_ids.append(sid) else: active.append(session) # Clean up expired for sid in expired_ids: del self._sessions[sid] return active
[docs] def clear_all(self) -> None: """Remove all sessions.""" with self._lock: self._sessions.clear()
def _maybe_auto_cleanup(self) -> None: """Perform automatic cleanup if threshold is reached. This is called periodically to clean up expired sessions without requiring manual intervention. Thread-safe (assumes caller holds lock). """ self._operation_counter += 1 if self._operation_counter >= self._auto_cleanup_threshold: self._operation_counter = 0 # Clean up expired sessions expired_count = self._cleanup_expired_unsafe() if expired_count > 0: from loguru import logger logger.debug( f"Auto-cleanup removed {expired_count} expired sessions", expired_count=expired_count, total_sessions=len(self._sessions), ) def _cleanup_expired_unsafe(self) -> int: """Internal cleanup method (assumes caller holds lock). Returns: Number of expired sessions removed """ expired_ids = [sid for sid, session in self._sessions.items() if session.is_expired()] for sid in expired_ids: del self._sessions[sid] return len(expired_ids) def _evict_lru_session(self) -> None: """Evict the least recently used session (LRU eviction) in O(1) time. This is called when max_sessions limit is reached. Removes the session at the front of the OrderedDict (oldest/least recently used). Thread-safe (assumes caller holds lock). """ if not self._sessions: return # Remove oldest (first) item in O(1) time lru_session_id, _ = self._sessions.popitem(last=False) from loguru import logger logger.debug( f"LRU eviction removed session {lru_session_id}", evicted_session_id=lru_session_id, total_sessions=len(self._sessions), )
[docs] class SessionPersistenceStrategy: """ Session persistence strategy (sticky sessions). Maintains consistent proxy assignment for a given session ID across multiple requests. Ensures that all requests within a session use the same proxy unless the proxy becomes unavailable. Features: - Session-to-proxy binding with configurable TTL - Automatic failover when assigned proxy becomes unhealthy - Thread-safe session management - Session expiration and cleanup Thread Safety: Uses SessionManager which has internal locking for thread-safe operations. Success Criteria: SC-005: 99.9% same-proxy guarantee for session requests Performance: O(1) session lookup, <1ms overhead for session management """ def __init__(self, max_sessions: int = 10000, auto_cleanup_threshold: int = 100) -> None: """Initialize session persistence strategy. Args: max_sessions: Maximum number of active sessions before LRU eviction (default: 10000) auto_cleanup_threshold: Number of operations between auto-cleanups (default: 100) """ self._session_manager = SessionManager( max_sessions=max_sessions, auto_cleanup_threshold=auto_cleanup_threshold ) self._fallback_strategy: RotationStrategy = RoundRobinStrategy() self._session_timeout_seconds: int = 3600 # 1 hour default self.config: StrategyConfig | None = None
[docs] def configure(self, config: StrategyConfig) -> None: """ Configure session persistence parameters. Args: config: Strategy configuration with session_stickiness_duration_seconds """ # Store config for later use self.config = config # Use session_stickiness_duration_seconds from config if config.session_stickiness_duration_seconds is not None: self._session_timeout_seconds = config.session_stickiness_duration_seconds # Optionally configure fallback strategy if hasattr(config, "fallback_strategy") and config.fallback_strategy: # In a real implementation, you'd instantiate the strategy from name pass
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that pool has necessary metadata for strategy. Session persistence doesn't require specific proxy metadata. Args: pool: The proxy pool to validate Returns: Always True - session persistence works with any pool """ return True
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy with session persistence. If session_id exists and proxy is healthy, returns same proxy. If session_id is new or assigned proxy is unhealthy, assigns new proxy. Args: pool: The proxy pool to select from context: Selection context with session_id (required) Returns: Healthy proxy assigned to the session Raises: ValueError: If context is None or session_id is missing ProxyPoolEmptyError: If no healthy proxies available """ if context is None or context.session_id is None: raise ValueError("SessionPersistenceStrategy requires SelectionContext with session_id") session_id = context.session_id # Check for existing session and attempt to reuse with single lookup session = self._session_manager.get_session(session_id) if session is not None: # Session exists - try to use assigned proxy try: # Convert proxy_id from string to UUID from uuid import UUID proxy_uuid = UUID(session.proxy_id) assigned_proxy = pool.get_proxy_by_id(proxy_uuid) # Check if proxy is still healthy or untested (UNKNOWN is acceptable) if assigned_proxy is not None and assigned_proxy.health_status in ( HealthStatus.HEALTHY, HealthStatus.UNKNOWN, ): # Update session last_used with cached session object # (single lookup, session touch happens exactly once) session.touch() # Mark proxy as in-use assigned_proxy.start_request() return assigned_proxy # Proxy unhealthy - need to failover (fall through to new selection) except Exception: # Error retrieving proxy - fall through to new selection pass # No valid session or failover needed - select new proxy healthy_proxies = pool.get_healthy_proxies() # Filter out failed proxies from context if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available for session") # Use fallback strategy to select new proxy from filtered list # Create temp pool with only healthy proxies temp_pool = ProxyPool(name="temp", proxies=healthy_proxies) new_proxy = self._fallback_strategy.select(temp_pool) # Create or update session with new proxy self._session_manager.create_session( session_id=session_id, proxy=new_proxy, timeout_seconds=self._session_timeout_seconds ) # Mark proxy as in-use new_proxy.start_request() return new_proxy
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of a request through a proxy. Updates proxy completion statistics via Proxy.complete_request(). Args: proxy: The proxy that handled the request success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Pass strategy's alpha to avoid mutating proxy state alpha = ( self.config.ema_alpha if hasattr(self, "config") and self.config is not None else None ) proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] def close_session(self, session_id: str) -> None: """ Explicitly close a session. Args: session_id: The session ID to close """ self._session_manager.remove_session(session_id)
[docs] def cleanup_expired_sessions(self) -> int: """ Remove expired sessions. Returns: Number of sessions removed """ return self._session_manager.cleanup_expired()
[docs] def get_session_stats(self) -> dict[str, int]: """ Get session statistics. Returns: dict[str, int]: Session statistics including total_sessions, max_sessions, and auto_cleanup_threshold. """ return { "total_sessions": len(self._session_manager._sessions), "max_sessions": self._session_manager._max_sessions, "auto_cleanup_threshold": self._session_manager._auto_cleanup_threshold, }
[docs] class GeoTargetedStrategy: """ Geo-targeted proxy selection strategy. Filters proxies based on geographical location (country or region) specified in the SelectionContext. Supports fallback to any proxy when no matches found. Features: - Country-based filtering (ISO 3166-1 alpha-2 codes) - Region-based filtering (custom region names) - Country takes precedence over region when both specified - Configurable fallback behavior - Secondary strategy for selection from filtered proxies Thread Safety: Stateless per-request operations, thread-safe. Success Criteria: SC-006: 100% correct region selection when available Performance: O(n) filtering + O(1) or O(n) secondary selection """ def __init__(self) -> None: """Initialize geo-targeted strategy.""" self._fallback_enabled: bool = True self._secondary_strategy: RotationStrategy = RoundRobinStrategy() self.config: StrategyConfig | None = None
[docs] def configure(self, config: StrategyConfig) -> None: """ Configure geo-targeting parameters. Args: config: Strategy configuration with geo settings """ # Store config for later use self.config = config if config.geo_fallback_enabled is not None: self._fallback_enabled = config.geo_fallback_enabled # Configure secondary strategy based on name if config.geo_secondary_strategy: strategy_name = config.geo_secondary_strategy.lower() if strategy_name == "round_robin": self._secondary_strategy = RoundRobinStrategy() elif strategy_name == "random": self._secondary_strategy = RandomStrategy() elif strategy_name == "least_used": self._secondary_strategy = LeastUsedStrategy()
# Default to round_robin if unknown
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """ Validate that pool has geo metadata. Geo-targeting is optional, so always returns True. Proxies without geo data will simply not match geo filters. Args: pool: The proxy pool to validate Returns: Always True - geo data is optional """ return True
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy based on geographical targeting. Selection logic: 1. If context has target_country: filter by country (exact match) 2. Else if context has target_region: filter by region (exact match) 3. If no target specified: use all healthy proxies 4. Apply context.failed_proxy_ids filtering 5. If filtered list empty and fallback enabled: use all healthy proxies 6. If filtered list empty and fallback disabled: raise error 7. Apply secondary strategy to filtered proxies Args: pool: The proxy pool to select from context: Selection context with target_country or target_region Returns: Proxy matching geo criteria (or any proxy if fallback enabled) Raises: ProxyPoolEmptyError: If no proxies match criteria and fallback disabled """ # Start with all healthy proxies healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available") # Determine target location from context target_country = context.target_country if context else None target_region = context.target_region if context else None # Filter by geography if target_country: # Country takes precedence - exact match filtered_proxies = [ p for p in healthy_proxies if p.country_code and p.country_code.upper() == target_country.upper() ] target_location = target_country elif target_region: # Region filtering - exact match filtered_proxies = [ p for p in healthy_proxies if p.region and p.region.upper() == target_region.upper() ] target_location = target_region else: # No geo targeting - use all healthy proxies filtered_proxies = healthy_proxies target_location = None # Filter out previously failed proxies from context if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) filtered_proxies = [p for p in filtered_proxies if str(p.id) not in failed_ids] # Handle empty filtered list if not filtered_proxies: if self._fallback_enabled: # Fallback to any healthy proxy (excluding failed) if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) filtered_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] else: filtered_proxies = healthy_proxies if not filtered_proxies: raise ProxyPoolEmptyError("No healthy proxies available after filtering") else: # No fallback - raise error with clear message location_str = ( f"country={target_location}" if target_country else f"region={target_location}" ) raise ProxyPoolEmptyError( f"No proxies available for target location: {location_str}" ) # Create temp pool and apply secondary strategy temp_pool = ProxyPool(name="geo_filtered", proxies=filtered_proxies) selected_proxy = self._secondary_strategy.select(temp_pool) # Mark proxy as in-use selected_proxy.start_request() return selected_proxy
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record the result of a request through a proxy. Updates proxy completion statistics via Proxy.complete_request(). Args: proxy: The proxy that handled the request success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class CostAwareStrategy: """ Cost-aware proxy selection strategy. Prioritizes free proxies over paid ones, with configurable cost thresholds. Uses weighted random selection based on inverse cost - lower cost proxies are more likely to be selected. Features: - Free proxies (cost_per_request = 0.0) are heavily favored - Paid proxies are selected based on inverse cost weighting - Configurable cost threshold to filter out expensive proxies - Supports fallback to any proxy when no low-cost options available Thread Safety: Uses Python's random.choices() which is thread-safe via GIL. Example: >>> from proxywhirl.strategies import CostAwareStrategy >>> strategy = CostAwareStrategy() >>> config = StrategyConfig(metadata={"max_cost_per_request": 0.5}) >>> strategy.configure(config) >>> proxy = strategy.select(pool) # Selects cheapest available proxy """ def __init__(self, max_cost_per_request: float | None = None) -> None: """Initialize cost-aware strategy. Args: max_cost_per_request: Maximum acceptable cost per request. Proxies exceeding this cost will be filtered out. None means no cost limit (default). """ self.config: StrategyConfig | None = None self.max_cost_per_request = max_cost_per_request self._free_proxy_boost: float = 10.0 # Weight multiplier for free proxies
[docs] def configure(self, config: StrategyConfig) -> None: """Configure cost-aware parameters. Args: config: Strategy configuration with optional metadata: - max_cost_per_request: Maximum cost threshold - free_proxy_boost: Weight multiplier for free proxies (default: 10.0) """ self.config = config # Extract max cost from metadata if present if config.metadata and "max_cost_per_request" in config.metadata: self.max_cost_per_request = float(config.metadata["max_cost_per_request"]) # Extract free proxy boost from metadata if present if config.metadata and "free_proxy_boost" in config.metadata: self._free_proxy_boost = float(config.metadata["free_proxy_boost"])
[docs] def validate_metadata(self, pool: ProxyPool) -> bool: """Validate that pool has cost metadata. Cost field is optional, so always returns True. Proxies without cost data are treated as free (cost = 0.0). Args: pool: The proxy pool to validate Returns: Always True - cost data is optional """ return True
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """Select a proxy based on cost optimization. Selection logic: 1. Get healthy proxies 2. Filter by context.failed_proxy_ids if present 3. Filter by max_cost_per_request threshold if configured 4. Apply inverse cost weighting (lower cost = higher weight) 5. Free proxies get boost multiplier (default 10x weight) 6. Use weighted random selection Args: pool: The proxy pool to select from context: Optional selection context for filtering Returns: Cost-optimized proxy selection Raises: ProxyPoolEmptyError: If no proxies meet criteria """ healthy_proxies = pool.get_healthy_proxies() if not healthy_proxies: raise ProxyPoolEmptyError("No healthy proxies available") # Filter by context failed proxies if context and context.failed_proxy_ids: failed_ids = set(context.failed_proxy_ids) healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids] if not healthy_proxies: raise ProxyPoolEmptyError( "No healthy proxies available after filtering failed proxies" ) # Filter by cost threshold if configured if self.max_cost_per_request is not None: cost_filtered = [ p for p in healthy_proxies if (p.cost_per_request or 0.0) <= self.max_cost_per_request ] # If filtering eliminates all proxies, raise error if not cost_filtered: raise ProxyPoolEmptyError( f"No proxies available with cost <= {self.max_cost_per_request}" ) healthy_proxies = cost_filtered # Calculate inverse cost weights weights = [] for proxy in healthy_proxies: cost = proxy.cost_per_request or 0.0 # Free proxy gets boost, paid proxy gets inverse cost weight weight = self._free_proxy_boost if cost == 0.0 else 1.0 / (cost + 0.001) weights.append(weight) # Normalize weights total_weight = sum(weights) if total_weight > 0: weights = [w / total_weight for w in weights] else: # Fallback to uniform weights (shouldn't happen with free proxy boost) weights = [1.0 / len(weights)] * len(weights) # Weighted random selection selected = random.choices(healthy_proxies, weights=weights, k=1)[0] # Mark proxy as in-use selected.start_request() return selected
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """Record the result of using a proxy. Args: proxy: The proxy that was used success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Pass strategy's alpha to avoid mutating proxy state alpha = self.config.ema_alpha if self.config is not None else None proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs] class CompositeStrategy: """ Composite strategy that applies filtering and selection strategies in sequence. This strategy implements the filter + select pattern: 1. Filter strategies narrow down the proxy pool based on criteria (e.g., geography) 2. Selector strategy chooses the best proxy from the filtered set Example: >>> # Filter by geography, then select by performance >>> from proxywhirl.strategies import CompositeStrategy, GeoTargetedStrategy, PerformanceBasedStrategy >>> strategy = CompositeStrategy( ... filters=[GeoTargetedStrategy()], ... selector=PerformanceBasedStrategy() ... ) >>> proxy = strategy.select(pool, SelectionContext(target_country="US")) Thread Safety: Thread-safe if all component strategies are thread-safe. Performance: Selection time is sum of filter and selector times. Target: <5ms total (SC-007). """ def __init__( self, filters: list[RotationStrategy] | None = None, selector: RotationStrategy | None = None, ): """ Initialize composite strategy. Args: filters: List of filtering strategies to apply sequentially selector: Final selection strategy to choose from filtered pool Raises: ValueError: If both filters and selector are None """ self.filters = filters or [] self.selector = selector or RoundRobinStrategy() if not self.filters and selector is None: raise ValueError("CompositeStrategy requires at least one filter or a selector")
[docs] def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy: """ Select a proxy by applying filters then selector. Process: 1. Start with full pool of healthy proxies 2. Apply each filter strategy sequentially 3. Apply selector strategy to filtered set 4. Return selected proxy Args: pool: The proxy pool to select from context: Request context with filtering criteria Returns: Selected proxy from filtered pool Raises: ProxyPoolEmptyError: If filters eliminate all proxies Performance: Target: <5ms total including all filters and selector (SC-007) """ from proxywhirl.exceptions import ProxyPoolEmptyError if context is None: from proxywhirl.models import SelectionContext context = SelectionContext() # Start with healthy proxies only filtered_proxies = [p for p in pool.get_all_proxies() if p.is_healthy] if not filtered_proxies: raise ProxyPoolEmptyError("No healthy proxies available") # Apply filters sequentially for filter_strategy in self.filters: # Create temporary pool with filtered proxies temp_pool = pool.__class__(name=f"{pool.name}-filtered") for proxy in filtered_proxies: temp_pool.add_proxy(proxy) # Apply filter try: selected = filter_strategy.select(temp_pool, context) # If filter returned one proxy, use it to filter the set # For geo-filtering, this means only proxies matching criteria remain filtered_proxies = [ p for p in filtered_proxies if self._matches_filter(p, selected) ] except ProxyPoolEmptyError: # Filter eliminated all proxies raise ProxyPoolEmptyError( f"Filter {filter_strategy.__class__.__name__} eliminated all proxies" ) if not filtered_proxies: raise ProxyPoolEmptyError("All proxies filtered out") # Apply selector to filtered pool final_pool = pool.__class__(name=f"{pool.name}-final") for proxy in filtered_proxies: final_pool.add_proxy(proxy) return self.selector.select(final_pool, context)
def _matches_filter(self, proxy: Proxy, filter_result: Proxy) -> bool: """ Check if proxy matches filter criteria based on filter result. For now, we use a simple heuristic: if the filter strategy selected a proxy, we check if it shares key attributes (country, region) with other proxies. Args: proxy: Proxy to check filter_result: Proxy returned by filter strategy Returns: True if proxy matches filter criteria """ # If same proxy, definitely matches if proxy.id == filter_result.id: return True # Check geo-location match if hasattr(filter_result, "country_code") and filter_result.country_code: return getattr(proxy, "country_code", None) == filter_result.country_code # If no specific criteria, include all return True
[docs] def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None: """ Record result by delegating to selector strategy. Args: proxy: The proxy that handled the request success: Whether the request succeeded response_time_ms: Response time in milliseconds """ # Delegate to selector for result recording self.selector.record_result(proxy, success, response_time_ms) # Also update filters if they track results for filter_strategy in self.filters: if hasattr(filter_strategy, "record_result"): filter_strategy.record_result(proxy, success, response_time_ms)
[docs] def configure(self, config: StrategyConfig) -> None: """ Configure all component strategies. Args: config: Strategy configuration to apply """ for filter_strategy in self.filters: if hasattr(filter_strategy, "configure"): filter_strategy.configure(config) if hasattr(self.selector, "configure"): self.selector.configure(config)
@classmethod
[docs] def from_config(cls, config: dict[str, Any]) -> CompositeStrategy: """ Create CompositeStrategy from configuration dictionary. Args: config: Configuration dict with keys: - filters: List of filter strategy names or instances - selector: Selector strategy name or instance Returns: Configured CompositeStrategy instance Example: >>> config = { ... "filters": ["geo-targeted"], ... "selector": "performance-based" ... } >>> strategy = CompositeStrategy.from_config(config) Raises: ValueError: If config is invalid """ filters_config = config.get("filters", []) selector_config = config.get("selector", "round-robin") # Convert filter names to instances filters = [] for f in filters_config: if isinstance(f, str): # Will be implemented with StrategyRegistry in T070 filters.append(cls._strategy_from_name(f)) else: filters.append(f) # Convert selector name to instance if isinstance(selector_config, str): selector = cls._strategy_from_name(selector_config) else: selector = selector_config return cls(filters=filters, selector=selector)
@staticmethod def _strategy_from_name(name: str) -> RotationStrategy: """ Convert strategy name to strategy instance. Args: name: Strategy name (e.g., "round-robin", "performance-based") Returns: Strategy instance Raises: ValueError: If strategy name not recognized """ # Simple mapping for now - will use StrategyRegistry in T070 strategy_map: dict[str, type[RotationStrategy]] = { "round-robin": RoundRobinStrategy, "random": RandomStrategy, "least-used": LeastUsedStrategy, "performance-based": PerformanceBasedStrategy, "session": SessionPersistenceStrategy, "geo-targeted": GeoTargetedStrategy, } strategy_class = strategy_map.get(name) if strategy_class is None: raise ValueError(f"Unknown strategy name: {name}") return strategy_class()