"""
Rotation strategies for proxy selection.
"""
from __future__ import annotations
import random
import threading
from collections import OrderedDict
from dataclasses import dataclass, field
from datetime import datetime, timedelta, timezone
from typing import Any, Protocol, runtime_checkable
from uuid import UUID
from proxywhirl.exceptions import ProxyPoolEmptyError
from proxywhirl.models import (
HealthStatus,
Proxy,
ProxyPool,
SelectionContext,
Session,
StrategyConfig,
)
# ============================================================================
# STRATEGY STATE MANAGEMENT
# ============================================================================
@dataclass
[docs]
class ProxyMetrics:
"""Per-proxy mutable metrics maintained by a strategy.
This class encapsulates performance metrics that a strategy tracks
for each proxy. By storing these separately from the Proxy model,
strategies can maintain independent metric state with their own
configuration (e.g., EMA alpha values).
Attributes:
ema_response_time_ms: Exponential moving average of response times
total_requests: Count of requests made through this proxy
total_successes: Count of successful requests
total_failures: Count of failed requests
last_response_time_ms: Most recent response time
window_start: Start time of the current sliding window
"""
[docs]
ema_response_time_ms: float | None = None
[docs]
total_requests: int = 0
[docs]
total_successes: int = 0
[docs]
total_failures: int = 0
[docs]
last_response_time_ms: float | None = None
[docs]
window_start: datetime | None = None
@property
[docs]
def success_rate(self) -> float:
"""Calculate success rate for this proxy in this strategy's state."""
if self.total_requests == 0:
return 0.0
return max(0.0, min(1.0, self.total_successes / self.total_requests))
[docs]
def update_ema(self, response_time_ms: float, alpha: float) -> None:
"""Update EMA with new response time.
Args:
response_time_ms: Response time in milliseconds
alpha: EMA smoothing factor (0-1)
"""
if self.ema_response_time_ms is None:
self.ema_response_time_ms = response_time_ms
else:
self.ema_response_time_ms = (
alpha * response_time_ms + (1 - alpha) * self.ema_response_time_ms
)
self.last_response_time_ms = response_time_ms
@dataclass
[docs]
class StrategyState:
"""Per-strategy mutable state for managing proxy metrics.
This class separates mutable strategy state from immutable proxy identity.
Each strategy instance maintains its own StrategyState, which tracks
per-proxy metrics independently. This allows different strategies to:
1. Use different EMA alpha values without conflicts
2. Track proxy performance independently
3. Maintain consistent metrics across strategy reconfiguration
The state is keyed by proxy UUID to ensure stable identity even if
proxy objects are recreated.
Example:
>>> state = StrategyState(ema_alpha=0.3)
>>> state.record_success(proxy.id, response_time_ms=150.0)
>>> metrics = state.get_metrics(proxy.id)
>>> print(metrics.ema_response_time_ms) # 150.0
Thread Safety:
Uses threading.Lock to protect all state mutations.
Attributes:
ema_alpha: EMA smoothing factor for this strategy's metrics
window_duration_seconds: Duration of sliding window for counter resets
"""
[docs]
window_duration_seconds: int = 3600
_metrics: dict[UUID, ProxyMetrics] = field(default_factory=dict)
_lock: threading.Lock = field(default_factory=threading.Lock)
[docs]
def get_metrics(self, proxy_id: UUID) -> ProxyMetrics:
"""Get or create metrics for a proxy.
Args:
proxy_id: UUID of the proxy
Returns:
ProxyMetrics instance for this proxy
"""
with self._lock:
if proxy_id not in self._metrics:
self._metrics[proxy_id] = ProxyMetrics()
return self._metrics[proxy_id]
[docs]
def record_success(self, proxy_id: UUID, response_time_ms: float) -> None:
"""Record a successful request.
Args:
proxy_id: UUID of the proxy
response_time_ms: Response time in milliseconds
"""
with self._lock:
metrics = self._metrics.setdefault(proxy_id, ProxyMetrics())
metrics.total_requests += 1
metrics.total_successes += 1
metrics.update_ema(response_time_ms, self.ema_alpha)
[docs]
def record_failure(self, proxy_id: UUID) -> None:
"""Record a failed request.
Args:
proxy_id: UUID of the proxy
"""
with self._lock:
metrics = self._metrics.setdefault(proxy_id, ProxyMetrics())
metrics.total_requests += 1
metrics.total_failures += 1
[docs]
def get_ema_response_time(self, proxy_id: UUID) -> float | None:
"""Get EMA response time for a proxy.
Args:
proxy_id: UUID of the proxy
Returns:
EMA response time in ms, or None if no data
"""
with self._lock:
if proxy_id not in self._metrics:
return None
return self._metrics[proxy_id].ema_response_time_ms
[docs]
def get_success_rate(self, proxy_id: UUID) -> float:
"""Get success rate for a proxy.
Args:
proxy_id: UUID of the proxy
Returns:
Success rate (0.0-1.0), or 0.0 if no data
"""
with self._lock:
if proxy_id not in self._metrics:
return 0.0
return self._metrics[proxy_id].success_rate
[docs]
def get_request_count(self, proxy_id: UUID) -> int:
"""Get total request count for a proxy.
Args:
proxy_id: UUID of the proxy
Returns:
Total number of requests, or 0 if no data
"""
with self._lock:
if proxy_id not in self._metrics:
return 0
return self._metrics[proxy_id].total_requests
[docs]
def reset_metrics(self, proxy_id: UUID) -> None:
"""Reset metrics for a proxy.
Args:
proxy_id: UUID of the proxy
"""
with self._lock:
if proxy_id in self._metrics:
self._metrics[proxy_id] = ProxyMetrics()
[docs]
def clear_all(self) -> None:
"""Clear all tracked metrics."""
with self._lock:
self._metrics.clear()
[docs]
class StrategyRegistry:
"""
Singleton registry for custom rotation strategies.
Allows registration and retrieval of custom strategy implementations,
enabling plugin architecture for ProxyWhirl.
Example:
>>> from proxywhirl.strategies import StrategyRegistry
>>>
>>> # Create custom strategy
>>> class MyStrategy:
... def select(self, pool):
... return pool.get_all_proxies()[0]
... def record_result(self, proxy, success, response_time_ms):
... pass
>>>
>>> # Register it
>>> registry = StrategyRegistry()
>>> registry.register_strategy("my-strategy", MyStrategy)
>>>
>>> # Retrieve and use
>>> strategy_class = registry.get_strategy("my-strategy")
>>> strategy = strategy_class()
Thread Safety:
Thread-safe singleton implementation using double-checked locking.
Performance:
Registration: O(1)
Retrieval: O(1)
Validation: <1ms per strategy (SC-010)
"""
_instance: StrategyRegistry | None = None
_lock = threading.RLock()
def __init__(self) -> None:
"""Initialize the registry (called once by __new__)."""
# These are initialized in __new__ for singleton pattern
if not hasattr(self, "_strategies"):
self._strategies: dict[str, type] = {}
if not hasattr(self, "_registry_lock"):
self._registry_lock = threading.RLock()
def __new__(cls) -> StrategyRegistry:
"""Ensure singleton pattern with thread-safe double-checked locking."""
if cls._instance is None:
with cls._lock:
if cls._instance is None:
instance = super().__new__(cls)
instance._strategies = {}
instance._registry_lock = threading.RLock()
cls._instance = instance
return cls._instance
[docs]
def register_strategy(self, name: str, strategy_class: type, *, validate: bool = True) -> None:
"""
Register a custom strategy.
Args:
name: Unique name for the strategy (e.g., "my-custom-strategy")
strategy_class: Strategy class implementing RotationStrategy protocol
validate: If True (default), validates strategy implements required methods
Raises:
ValueError: If strategy name already registered (unless re-registering)
TypeError: If strategy doesn't implement required protocol methods
Example:
>>> class FastStrategy:
... def select(self, pool):
... return pool.get_all_proxies()[0]
... def record_result(self, proxy, success, response_time_ms):
... pass
>>>
>>> registry = StrategyRegistry()
>>> registry.register_strategy("fast", FastStrategy)
"""
import time
start_time = time.perf_counter()
if validate:
self._validate_strategy(strategy_class)
with self._registry_lock:
# Allow re-registration (replacement)
if name in self._strategies:
from loguru import logger
logger.warning(
f"Replacing existing strategy registration: {name}",
old_class=self._strategies[name].__name__,
new_class=strategy_class.__name__,
)
self._strategies[name] = strategy_class
load_time = (time.perf_counter() - start_time) * 1000
from loguru import logger
logger.info(
f"Registered strategy: {name} ({strategy_class.__name__}) in {load_time:.2f}ms",
strategy_name=name,
strategy_class=strategy_class.__name__,
load_time_ms=load_time,
)
# Validate SC-010: <1 second load time
if load_time >= 1000.0:
logger.warning(
f"Strategy registration exceeded target time: {load_time:.2f}ms (target: <1000ms)",
strategy_name=name,
load_time_ms=load_time,
)
[docs]
def get_strategy(self, name: str) -> type:
"""
Retrieve a registered strategy class.
Args:
name: Strategy name used during registration
Returns:
Strategy class (not instance - caller must instantiate)
Raises:
KeyError: If strategy name not found in registry
Example:
>>> registry = StrategyRegistry()
>>> strategy_class = registry.get_strategy("my-strategy")
>>> strategy = strategy_class() # Instantiate
"""
with self._registry_lock:
if name not in self._strategies:
available = ", ".join(self._strategies.keys()) if self._strategies else "none"
raise KeyError(
f"Strategy '{name}' not found in registry. Available strategies: {available}"
)
return self._strategies[name]
[docs]
def list_strategies(self) -> list[str]:
"""
List all registered strategy names.
Returns:
List of registered strategy names
"""
with self._registry_lock:
return list(self._strategies.keys())
[docs]
def unregister_strategy(self, name: str) -> None:
"""
Remove a strategy from the registry.
Args:
name: Strategy name to unregister
Raises:
KeyError: If strategy name not found
"""
with self._registry_lock:
if name not in self._strategies:
raise KeyError(f"Strategy '{name}' not found in registry")
del self._strategies[name]
from loguru import logger
logger.info(f"Unregistered strategy: {name}", strategy_name=name)
def _validate_strategy(self, strategy_class: type) -> None:
"""
Validate that strategy class implements required protocol.
Args:
strategy_class: Strategy class to validate
Raises:
TypeError: If required methods are missing
"""
required_methods = ["select", "record_result"]
missing_methods = []
for method in required_methods:
if not hasattr(strategy_class, method):
missing_methods.append(method)
if missing_methods:
raise TypeError(
f"Strategy {strategy_class.__name__} missing required methods: "
f"{', '.join(missing_methods)}. "
f"Must implement RotationStrategy protocol."
)
@classmethod
[docs]
def reset(cls) -> None:
"""
Reset the singleton instance (useful for testing).
Warning:
This should only be used in tests. Calling this in production
will clear all registered strategies.
"""
with cls._lock:
cls._instance = None
@runtime_checkable
[docs]
class RotationStrategy(Protocol):
"""Protocol defining interface for proxy rotation strategies."""
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a proxy from the pool based on strategy logic.
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Selected proxy
Raises:
ProxyPoolEmptyError: If no suitable proxy is available
"""
...
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record the result of using a proxy.
Args:
proxy: The proxy that was used
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
...
[docs]
class RoundRobinStrategy:
"""
Round-robin proxy selection strategy with SelectionContext support.
Selects proxies in sequential order, wrapping around to the first
proxy after reaching the end of the list. Only selects healthy proxies.
Supports filtering based on SelectionContext (e.g., failed_proxy_ids).
Thread Safety:
Uses threading.Lock to protect _current_index access, ensuring atomic
index increment and preventing proxy skipping or duplicate selection
in multi-threaded environments.
"""
def __init__(self) -> None:
"""Initialize round-robin strategy."""
self._current_index: int = 0
self._lock = threading.Lock()
self.config: StrategyConfig | None = None
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select next proxy in round-robin order.
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Next healthy proxy in rotation
Raises:
ProxyPoolEmptyError: If no healthy proxies are available
"""
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available in pool")
# Filter out failed proxies if context provided
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError(
"No healthy proxies available after filtering failed proxies"
)
# Select proxy at current index (with wraparound) - thread-safe
with self._lock:
index = self._current_index % len(healthy_proxies)
self._current_index = (self._current_index + 1) % len(healthy_proxies)
proxy = healthy_proxies[index]
# Update proxy metadata to track request start
proxy.start_request()
return proxy
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record the result of using a proxy.
Updates proxy statistics based on request outcome and completes the request tracking.
Args:
proxy: The proxy that was used
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Pass strategy's alpha to avoid mutating proxy state
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
class RandomStrategy:
"""
Random proxy selection strategy with SelectionContext support.
Randomly selects a proxy from the pool of healthy proxies.
Provides unpredictable rotation for scenarios where sequential
patterns should be avoided.
Thread Safety: Uses Python's random module which is thread-safe
via GIL-protected random number generation. No additional locking required.
"""
def __init__(self) -> None:
"""Initialize random strategy."""
self.config: StrategyConfig | None = None
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a random healthy proxy.
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Randomly selected healthy proxy
Raises:
ProxyPoolEmptyError: If no healthy proxies are available
"""
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available in pool")
# Filter out failed proxies if context provided
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError(
"No healthy proxies available after filtering failed proxies"
)
proxy = random.choice(healthy_proxies)
# Update proxy metadata to track request start
proxy.start_request()
return proxy
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""Record the result of using a proxy."""
# Pass strategy's alpha to avoid mutating proxy state
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
class WeightedStrategy:
"""
Weighted proxy selection strategy with SelectionContext support.
Selects proxies based on custom weights or success rates. When custom weights
are provided via StrategyConfig, they take precedence. Otherwise, weights are
derived from success_rate. Uses weighted random selection to favor
higher-performing proxies while still giving all proxies a chance.
Supports:
- Custom weights via StrategyConfig.weights (proxy URL -> weight mapping)
- Fallback to success_rate-based weights
- Minimum weight (0.1) to ensure all proxies have selection chance
- SelectionContext for filtering (e.g., failed_proxy_ids)
- Weight caching to avoid O(n) recalculation on every selection
Thread Safety:
Uses threading.Lock to protect weight cache access, ensuring atomic
cache validation and update operations. Prevents race conditions where
multiple threads could trigger duplicate weight recalculations or
inconsistent cache states.
"""
def __init__(self) -> None:
"""Initialize weighted strategy."""
self.config: StrategyConfig | None = None
self._cached_weights: list[float] | None = None
self._cached_proxy_ids: list[str] | None = None
self._cache_valid: bool = False
self._cache_lock = threading.Lock()
def _invalidate_cache(self) -> None:
"""
Invalidate the weight cache, forcing recalculation on next selection.
Thread-safe: Acquires cache lock to ensure atomicity of invalidation.
"""
with self._cache_lock:
self._cache_valid = False
self._cached_weights = None
self._cached_proxy_ids = None
def _calculate_weights(self, proxies: list[Proxy]) -> list[float]:
"""
Calculate weights for the given proxies.
Weights are always normalized to sum to 1.0 to maintain the invariant
that total weight sum equals 1.0, even when proxies are removed.
Args:
proxies: List of proxies to calculate weights for
Returns:
List of normalized weights corresponding to each proxy (sum = 1.0)
"""
weights = []
for proxy in proxies:
if self.config and self.config.weights:
# Use custom weights from config
custom_weight = self.config.weights.get(proxy.url, None)
if custom_weight is not None and custom_weight > 0:
weights.append(custom_weight)
else:
# Fallback to success rate or minimum weight
weights.append(max(proxy.success_rate, 0.1))
else:
# Use success rates as weights
# Add small base weight (0.1) to give all proxies a chance
weights.append(max(proxy.success_rate, 0.1))
# Handle edge case: all weights are zero/negative (shouldn't happen with max(0.1))
if all(w <= 0 for w in weights):
# Fallback to uniform weights
weights = [1.0] * len(weights)
# Normalize weights to sum to 1.0
# This ensures the invariant is maintained even when proxies are removed
total_weight = sum(weights)
if total_weight > 0:
weights = [w / total_weight for w in weights]
else:
# Fallback to uniform normalized weights
weights = [1.0 / len(weights)] * len(weights)
return weights
def _get_weights(self, proxies: list[Proxy]) -> list[float]:
"""
Get weights for proxies, using cache if valid.
Thread-safe: Acquires cache lock to ensure atomic cache check and update.
Prevents race conditions where multiple threads could trigger duplicate
weight recalculations.
Args:
proxies: List of proxies to get weights for
Returns:
List of weights corresponding to each proxy
"""
# Generate proxy ID list for cache validation
current_proxy_ids = [str(proxy.id) for proxy in proxies]
# Protect cache access with lock
with self._cache_lock:
# Check if cache is valid
if self._cache_valid and self._cached_proxy_ids == current_proxy_ids:
# Cache hit - return cached weights
return self._cached_weights # type: ignore[return-value]
# Cache miss - recalculate weights
weights = self._calculate_weights(proxies)
# Update cache atomically
self._cached_weights = weights
self._cached_proxy_ids = current_proxy_ids
self._cache_valid = True
return weights
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a proxy weighted by custom weights or success rate.
Uses cached weights when possible to avoid O(n) recalculation on every call.
Cache is invalidated when the proxy set changes (different IDs).
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Weighted-random selected healthy proxy
Raises:
ProxyPoolEmptyError: If no healthy proxies are available
"""
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available in pool")
# Filter out failed proxies if context provided
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError(
"No healthy proxies available after filtering failed proxies"
)
# Get weights (from cache if valid, otherwise recalculate)
weights = self._get_weights(healthy_proxies)
# Use random.choices for weighted selection
selected = random.choices(healthy_proxies, weights=weights, k=1)[0]
# Update proxy metadata to track request start
selected.start_request()
return selected
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record the result of using a proxy.
Updates proxy statistics based on request outcome and invalidates the
weight cache since success rates may have changed.
Thread-safe: Uses double-checked locking pattern to ensure atomic
invalidation and update. This prevents race conditions where another
thread could select using stale weights while proxy stats are being updated.
The lock ensures:
1. No thread can read cached weights between invalidation and stat update
2. Proxy stat updates are atomic with cache invalidation
3. Multiple concurrent record_result() calls don't interfere
Args:
proxy: The proxy that was used
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Double-checked locking: ensure atomic cache invalidation + proxy update
# First check without lock (fast path for most calls)
needs_invalidation = self._cache_valid
if needs_invalidation:
# Acquire lock for atomic invalidation + update
with self._cache_lock:
# Re-check under lock (double-checked locking)
if self._cache_valid:
# Invalidate cache atomically
self._cache_valid = False
self._cached_weights = None
self._cached_proxy_ids = None
# Update proxy stats while holding lock
# This ensures no thread can see stale cache between invalidation and update
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(
success=success, response_time_ms=response_time_ms, alpha=alpha
)
else:
# Cache already invalid, just update proxy stats
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
class LeastUsedStrategy:
"""
Least-used proxy selection strategy with SelectionContext support.
Selects the proxy with the fewest started requests, helping to
balance load across all available proxies. Uses min-heap for
efficient O(log n) selection.
Performance:
- O(log n) selection using min-heap
- O(n) heap rebuild when pool composition changes
- Lazy heap invalidation for optimal performance
Thread Safety:
Uses threading.Lock to ensure atomic select-and-mark operations,
preventing TOCTOU race conditions where multiple threads could
select the same "least used" proxy simultaneously.
Implementation:
Uses a min-heap with lazy invalidation. The heap is rebuilt when:
1. Pool composition changes (detected via proxy ID set)
2. Heap becomes empty after filtering
The heap stores tuples of (requests_started, proxy_id, proxy) for
efficient comparison and retrieval.
"""
def __init__(self) -> None:
"""Initialize least-used strategy."""
self.config: StrategyConfig | None = None
self._lock = threading.Lock()
self._heap: list[tuple[int, str, Proxy]] = []
self._proxy_id_set: set[str] = set() # Track pool composition
def _rebuild_heap(self, proxies: list[Proxy]) -> None:
"""
Rebuild the min-heap with current proxy states.
Args:
proxies: List of proxies to build heap from
"""
import heapq
self._heap = [(proxy.requests_started, str(proxy.id), proxy) for proxy in proxies]
heapq.heapify(self._heap)
self._proxy_id_set = {str(proxy.id) for proxy in proxies}
def _needs_rebuild(self, current_proxy_ids: set[str]) -> bool:
"""
Check if heap needs rebuilding due to pool composition change.
Args:
current_proxy_ids: Set of current proxy IDs in filtered pool
Returns:
True if heap needs rebuilding
"""
return self._proxy_id_set != current_proxy_ids or not self._heap
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select the least-used healthy proxy using min-heap.
Uses min-heap for O(log n) selection. The heap is lazily rebuilt when
pool composition changes, providing optimal performance for stable pools.
The selection and usage marking are performed atomically under a lock
to prevent TOCTOU race conditions where multiple threads could select
the same "least used" proxy simultaneously.
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Healthy proxy with fewest started requests
Raises:
ProxyPoolEmptyError: If no healthy proxies are available
"""
import heapq
# Atomic select-and-mark to prevent TOCTOU race condition
with self._lock:
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available in pool")
# Filter out failed proxies if context provided
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError(
"No healthy proxies available after filtering failed proxies"
)
# Check if heap needs rebuilding
current_proxy_ids = {str(p.id) for p in healthy_proxies}
if self._needs_rebuild(current_proxy_ids):
self._rebuild_heap(healthy_proxies)
# Extract minimum from heap (O(log n))
# The heap may contain stale entries (old requests_started values)
# so we keep popping until we find a valid proxy from our filtered set
while self._heap:
requests_started, proxy_id, proxy = heapq.heappop(self._heap)
# Verify proxy is still in our filtered set
if proxy_id in current_proxy_ids:
# Re-push other proxies back onto heap
# Note: This proxy's requests_started will be incremented,
# so we rebuild heap on next call when composition changes
# Update proxy metadata to track request start (atomic with selection)
proxy.start_request()
# Invalidate heap since we modified a proxy's state
# This forces rebuild on next selection
self._proxy_id_set = set()
return proxy
# If we get here, heap was empty after filtering - rebuild and retry
self._rebuild_heap(healthy_proxies)
if not self._heap:
raise ProxyPoolEmptyError("No healthy proxies available")
# Extract minimum from rebuilt heap
requests_started, proxy_id, proxy = heapq.heappop(self._heap)
# Update proxy metadata to track request start (atomic with selection)
proxy.start_request()
# Invalidate heap since we modified a proxy's state
self._proxy_id_set = set()
return proxy
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""Record the result of using a proxy."""
# Pass strategy's alpha to avoid mutating proxy state
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
# ============================================================================
# SESSION MANAGEMENT
# ============================================================================
[docs]
class SessionManager:
"""Thread-safe session manager for sticky proxy assignments.
Manages the mapping between session IDs and their assigned proxies,
with automatic expiration and cleanup. All operations are thread-safe.
Features:
- Automatic TTL-based expiration
- LRU eviction when max_sessions limit is reached
- Periodic cleanup of expired sessions
"""
def __init__(self, max_sessions: int = 10000, auto_cleanup_threshold: int = 100) -> None:
"""Initialize the session manager.
Args:
max_sessions: Maximum number of active sessions (default: 10000)
auto_cleanup_threshold: Trigger cleanup after this many operations (default: 100)
"""
self._sessions: OrderedDict[str, Session] = OrderedDict()
self._lock = threading.RLock()
self._max_sessions = max_sessions
self._auto_cleanup_threshold = auto_cleanup_threshold
self._operation_counter = 0
[docs]
def create_session(
self,
session_id: str,
proxy: Proxy,
timeout_seconds: int = 300,
) -> Session:
"""Create or update a session assignment.
Args:
session_id: Unique identifier for the session
proxy: Proxy to assign to this session
timeout_seconds: Session TTL in seconds (default 5 minutes)
Returns:
The created/updated Session object
"""
with self._lock:
# Auto-cleanup check
self._maybe_auto_cleanup()
# Check if we need to evict old sessions (LRU)
if len(self._sessions) >= self._max_sessions:
self._evict_lru_session()
now = datetime.now(timezone.utc)
expires_at = now + timedelta(seconds=timeout_seconds)
session = Session(
session_id=session_id,
proxy_id=str(proxy.id),
created_at=now,
expires_at=expires_at,
last_used_at=now,
request_count=0,
)
self._sessions[session_id] = session
return session
[docs]
def get_session(self, session_id: str) -> Session | None:
"""Get an active session by ID.
Args:
session_id: The session ID to look up
Returns:
Session object if found and not expired, None otherwise
"""
with self._lock:
session = self._sessions.get(session_id)
if session is None:
return None
# Check if expired
if session.is_expired():
del self._sessions[session_id]
return None
# Mark as recently used (move to end of OrderedDict)
self._sessions.move_to_end(session_id)
return session
[docs]
def touch_session(self, session_id: str) -> bool:
"""Update session last_used_at and increment request_count.
Args:
session_id: The session ID to touch
Returns:
True if session was touched, False if not found or expired
"""
with self._lock:
session = self.get_session(session_id)
if session is None:
return False
session.touch()
return True
[docs]
def remove_session(self, session_id: str) -> bool:
"""Remove a session from the manager.
Args:
session_id: The session ID to remove
Returns:
True if session was removed, False if not found
"""
with self._lock:
if session_id in self._sessions:
del self._sessions[session_id]
return True
return False
[docs]
def cleanup_expired(self) -> int:
"""Remove all expired sessions.
Returns:
Number of expired sessions removed
"""
with self._lock:
expired_ids = [sid for sid, session in self._sessions.items() if session.is_expired()]
for sid in expired_ids:
del self._sessions[sid]
return len(expired_ids)
[docs]
def get_all_sessions(self) -> list[Session]:
"""Get all active (non-expired) sessions.
Returns:
List of active Session objects
"""
with self._lock:
# Filter out expired sessions
active = []
expired_ids = []
for sid, session in self._sessions.items():
if session.is_expired():
expired_ids.append(sid)
else:
active.append(session)
# Clean up expired
for sid in expired_ids:
del self._sessions[sid]
return active
[docs]
def clear_all(self) -> None:
"""Remove all sessions."""
with self._lock:
self._sessions.clear()
def _maybe_auto_cleanup(self) -> None:
"""Perform automatic cleanup if threshold is reached.
This is called periodically to clean up expired sessions without
requiring manual intervention. Thread-safe (assumes caller holds lock).
"""
self._operation_counter += 1
if self._operation_counter >= self._auto_cleanup_threshold:
self._operation_counter = 0
# Clean up expired sessions
expired_count = self._cleanup_expired_unsafe()
if expired_count > 0:
from loguru import logger
logger.debug(
f"Auto-cleanup removed {expired_count} expired sessions",
expired_count=expired_count,
total_sessions=len(self._sessions),
)
def _cleanup_expired_unsafe(self) -> int:
"""Internal cleanup method (assumes caller holds lock).
Returns:
Number of expired sessions removed
"""
expired_ids = [sid for sid, session in self._sessions.items() if session.is_expired()]
for sid in expired_ids:
del self._sessions[sid]
return len(expired_ids)
def _evict_lru_session(self) -> None:
"""Evict the least recently used session (LRU eviction) in O(1) time.
This is called when max_sessions limit is reached. Removes the session
at the front of the OrderedDict (oldest/least recently used).
Thread-safe (assumes caller holds lock).
"""
if not self._sessions:
return
# Remove oldest (first) item in O(1) time
lru_session_id, _ = self._sessions.popitem(last=False)
from loguru import logger
logger.debug(
f"LRU eviction removed session {lru_session_id}",
evicted_session_id=lru_session_id,
total_sessions=len(self._sessions),
)
[docs]
class SessionPersistenceStrategy:
"""
Session persistence strategy (sticky sessions).
Maintains consistent proxy assignment for a given session ID across multiple
requests. Ensures that all requests within a session use the same proxy unless
the proxy becomes unavailable.
Features:
- Session-to-proxy binding with configurable TTL
- Automatic failover when assigned proxy becomes unhealthy
- Thread-safe session management
- Session expiration and cleanup
Thread Safety:
Uses SessionManager which has internal locking for thread-safe operations.
Success Criteria:
SC-005: 99.9% same-proxy guarantee for session requests
Performance:
O(1) session lookup, <1ms overhead for session management
"""
def __init__(self, max_sessions: int = 10000, auto_cleanup_threshold: int = 100) -> None:
"""Initialize session persistence strategy.
Args:
max_sessions: Maximum number of active sessions before LRU eviction (default: 10000)
auto_cleanup_threshold: Number of operations between auto-cleanups (default: 100)
"""
self._session_manager = SessionManager(
max_sessions=max_sessions, auto_cleanup_threshold=auto_cleanup_threshold
)
self._fallback_strategy: RotationStrategy = RoundRobinStrategy()
self._session_timeout_seconds: int = 3600 # 1 hour default
self.config: StrategyConfig | None = None
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a proxy with session persistence.
If session_id exists and proxy is healthy, returns same proxy.
If session_id is new or assigned proxy is unhealthy, assigns new proxy.
Args:
pool: The proxy pool to select from
context: Selection context with session_id (required)
Returns:
Healthy proxy assigned to the session
Raises:
ValueError: If context is None or session_id is missing
ProxyPoolEmptyError: If no healthy proxies available
"""
if context is None or context.session_id is None:
raise ValueError("SessionPersistenceStrategy requires SelectionContext with session_id")
session_id = context.session_id
# Check for existing session and attempt to reuse with single lookup
session = self._session_manager.get_session(session_id)
if session is not None:
# Session exists - try to use assigned proxy
try:
# Convert proxy_id from string to UUID
from uuid import UUID
proxy_uuid = UUID(session.proxy_id)
assigned_proxy = pool.get_proxy_by_id(proxy_uuid)
# Check if proxy is still healthy or untested (UNKNOWN is acceptable)
if assigned_proxy is not None and assigned_proxy.health_status in (
HealthStatus.HEALTHY,
HealthStatus.UNKNOWN,
):
# Update session last_used with cached session object
# (single lookup, session touch happens exactly once)
session.touch()
# Mark proxy as in-use
assigned_proxy.start_request()
return assigned_proxy
# Proxy unhealthy - need to failover (fall through to new selection)
except Exception:
# Error retrieving proxy - fall through to new selection
pass
# No valid session or failover needed - select new proxy
healthy_proxies = pool.get_healthy_proxies()
# Filter out failed proxies from context
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available for session")
# Use fallback strategy to select new proxy from filtered list
# Create temp pool with only healthy proxies
temp_pool = ProxyPool(name="temp", proxies=healthy_proxies)
new_proxy = self._fallback_strategy.select(temp_pool)
# Create or update session with new proxy
self._session_manager.create_session(
session_id=session_id, proxy=new_proxy, timeout_seconds=self._session_timeout_seconds
)
# Mark proxy as in-use
new_proxy.start_request()
return new_proxy
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record the result of a request through a proxy.
Updates proxy completion statistics via Proxy.complete_request().
Args:
proxy: The proxy that handled the request
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Pass strategy's alpha to avoid mutating proxy state
alpha = (
self.config.ema_alpha if hasattr(self, "config") and self.config is not None else None
)
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
def close_session(self, session_id: str) -> None:
"""
Explicitly close a session.
Args:
session_id: The session ID to close
"""
self._session_manager.remove_session(session_id)
[docs]
def cleanup_expired_sessions(self) -> int:
"""
Remove expired sessions.
Returns:
Number of sessions removed
"""
return self._session_manager.cleanup_expired()
[docs]
def get_session_stats(self) -> dict[str, int]:
"""
Get session statistics.
Returns:
dict[str, int]: Session statistics including total_sessions, max_sessions,
and auto_cleanup_threshold.
"""
return {
"total_sessions": len(self._session_manager._sessions),
"max_sessions": self._session_manager._max_sessions,
"auto_cleanup_threshold": self._session_manager._auto_cleanup_threshold,
}
[docs]
class GeoTargetedStrategy:
"""
Geo-targeted proxy selection strategy.
Filters proxies based on geographical location (country or region) specified
in the SelectionContext. Supports fallback to any proxy when no matches found.
Features:
- Country-based filtering (ISO 3166-1 alpha-2 codes)
- Region-based filtering (custom region names)
- Country takes precedence over region when both specified
- Configurable fallback behavior
- Secondary strategy for selection from filtered proxies
Thread Safety:
Stateless per-request operations, thread-safe.
Success Criteria:
SC-006: 100% correct region selection when available
Performance:
O(n) filtering + O(1) or O(n) secondary selection
"""
def __init__(self) -> None:
"""Initialize geo-targeted strategy."""
self._fallback_enabled: bool = True
self._secondary_strategy: RotationStrategy = RoundRobinStrategy()
self.config: StrategyConfig | None = None
# Default to round_robin if unknown
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a proxy based on geographical targeting.
Selection logic:
1. If context has target_country: filter by country (exact match)
2. Else if context has target_region: filter by region (exact match)
3. If no target specified: use all healthy proxies
4. Apply context.failed_proxy_ids filtering
5. If filtered list empty and fallback enabled: use all healthy proxies
6. If filtered list empty and fallback disabled: raise error
7. Apply secondary strategy to filtered proxies
Args:
pool: The proxy pool to select from
context: Selection context with target_country or target_region
Returns:
Proxy matching geo criteria (or any proxy if fallback enabled)
Raises:
ProxyPoolEmptyError: If no proxies match criteria and fallback disabled
"""
# Start with all healthy proxies
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available")
# Determine target location from context
target_country = context.target_country if context else None
target_region = context.target_region if context else None
# Filter by geography
if target_country:
# Country takes precedence - exact match
filtered_proxies = [
p
for p in healthy_proxies
if p.country_code and p.country_code.upper() == target_country.upper()
]
target_location = target_country
elif target_region:
# Region filtering - exact match
filtered_proxies = [
p for p in healthy_proxies if p.region and p.region.upper() == target_region.upper()
]
target_location = target_region
else:
# No geo targeting - use all healthy proxies
filtered_proxies = healthy_proxies
target_location = None
# Filter out previously failed proxies from context
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
filtered_proxies = [p for p in filtered_proxies if str(p.id) not in failed_ids]
# Handle empty filtered list
if not filtered_proxies:
if self._fallback_enabled:
# Fallback to any healthy proxy (excluding failed)
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
filtered_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
else:
filtered_proxies = healthy_proxies
if not filtered_proxies:
raise ProxyPoolEmptyError("No healthy proxies available after filtering")
else:
# No fallback - raise error with clear message
location_str = (
f"country={target_location}" if target_country else f"region={target_location}"
)
raise ProxyPoolEmptyError(
f"No proxies available for target location: {location_str}"
)
# Create temp pool and apply secondary strategy
temp_pool = ProxyPool(name="geo_filtered", proxies=filtered_proxies)
selected_proxy = self._secondary_strategy.select(temp_pool)
# Mark proxy as in-use
selected_proxy.start_request()
return selected_proxy
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record the result of a request through a proxy.
Updates proxy completion statistics via Proxy.complete_request().
Args:
proxy: The proxy that handled the request
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Pass strategy's alpha to avoid mutating proxy state
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
class CostAwareStrategy:
"""
Cost-aware proxy selection strategy.
Prioritizes free proxies over paid ones, with configurable cost thresholds.
Uses weighted random selection based on inverse cost - lower cost proxies
are more likely to be selected.
Features:
- Free proxies (cost_per_request = 0.0) are heavily favored
- Paid proxies are selected based on inverse cost weighting
- Configurable cost threshold to filter out expensive proxies
- Supports fallback to any proxy when no low-cost options available
Thread Safety:
Uses Python's random.choices() which is thread-safe via GIL.
Example:
>>> from proxywhirl.strategies import CostAwareStrategy
>>> strategy = CostAwareStrategy()
>>> config = StrategyConfig(metadata={"max_cost_per_request": 0.5})
>>> strategy.configure(config)
>>> proxy = strategy.select(pool) # Selects cheapest available proxy
"""
def __init__(self, max_cost_per_request: float | None = None) -> None:
"""Initialize cost-aware strategy.
Args:
max_cost_per_request: Maximum acceptable cost per request.
Proxies exceeding this cost will be filtered out.
None means no cost limit (default).
"""
self.config: StrategyConfig | None = None
self.max_cost_per_request = max_cost_per_request
self._free_proxy_boost: float = 10.0 # Weight multiplier for free proxies
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""Select a proxy based on cost optimization.
Selection logic:
1. Get healthy proxies
2. Filter by context.failed_proxy_ids if present
3. Filter by max_cost_per_request threshold if configured
4. Apply inverse cost weighting (lower cost = higher weight)
5. Free proxies get boost multiplier (default 10x weight)
6. Use weighted random selection
Args:
pool: The proxy pool to select from
context: Optional selection context for filtering
Returns:
Cost-optimized proxy selection
Raises:
ProxyPoolEmptyError: If no proxies meet criteria
"""
healthy_proxies = pool.get_healthy_proxies()
if not healthy_proxies:
raise ProxyPoolEmptyError("No healthy proxies available")
# Filter by context failed proxies
if context and context.failed_proxy_ids:
failed_ids = set(context.failed_proxy_ids)
healthy_proxies = [p for p in healthy_proxies if str(p.id) not in failed_ids]
if not healthy_proxies:
raise ProxyPoolEmptyError(
"No healthy proxies available after filtering failed proxies"
)
# Filter by cost threshold if configured
if self.max_cost_per_request is not None:
cost_filtered = [
p
for p in healthy_proxies
if (p.cost_per_request or 0.0) <= self.max_cost_per_request
]
# If filtering eliminates all proxies, raise error
if not cost_filtered:
raise ProxyPoolEmptyError(
f"No proxies available with cost <= {self.max_cost_per_request}"
)
healthy_proxies = cost_filtered
# Calculate inverse cost weights
weights = []
for proxy in healthy_proxies:
cost = proxy.cost_per_request or 0.0
# Free proxy gets boost, paid proxy gets inverse cost weight
weight = self._free_proxy_boost if cost == 0.0 else 1.0 / (cost + 0.001)
weights.append(weight)
# Normalize weights
total_weight = sum(weights)
if total_weight > 0:
weights = [w / total_weight for w in weights]
else:
# Fallback to uniform weights (shouldn't happen with free proxy boost)
weights = [1.0 / len(weights)] * len(weights)
# Weighted random selection
selected = random.choices(healthy_proxies, weights=weights, k=1)[0]
# Mark proxy as in-use
selected.start_request()
return selected
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""Record the result of using a proxy.
Args:
proxy: The proxy that was used
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Pass strategy's alpha to avoid mutating proxy state
alpha = self.config.ema_alpha if self.config is not None else None
proxy.complete_request(success=success, response_time_ms=response_time_ms, alpha=alpha)
[docs]
class CompositeStrategy:
"""
Composite strategy that applies filtering and selection strategies in sequence.
This strategy implements the filter + select pattern:
1. Filter strategies narrow down the proxy pool based on criteria (e.g., geography)
2. Selector strategy chooses the best proxy from the filtered set
Example:
>>> # Filter by geography, then select by performance
>>> from proxywhirl.strategies import CompositeStrategy, GeoTargetedStrategy, PerformanceBasedStrategy
>>> strategy = CompositeStrategy(
... filters=[GeoTargetedStrategy()],
... selector=PerformanceBasedStrategy()
... )
>>> proxy = strategy.select(pool, SelectionContext(target_country="US"))
Thread Safety:
Thread-safe if all component strategies are thread-safe.
Performance:
Selection time is sum of filter and selector times.
Target: <5ms total (SC-007).
"""
def __init__(
self,
filters: list[RotationStrategy] | None = None,
selector: RotationStrategy | None = None,
):
"""
Initialize composite strategy.
Args:
filters: List of filtering strategies to apply sequentially
selector: Final selection strategy to choose from filtered pool
Raises:
ValueError: If both filters and selector are None
"""
self.filters = filters or []
self.selector = selector or RoundRobinStrategy()
if not self.filters and selector is None:
raise ValueError("CompositeStrategy requires at least one filter or a selector")
[docs]
def select(self, pool: ProxyPool, context: SelectionContext | None = None) -> Proxy:
"""
Select a proxy by applying filters then selector.
Process:
1. Start with full pool of healthy proxies
2. Apply each filter strategy sequentially
3. Apply selector strategy to filtered set
4. Return selected proxy
Args:
pool: The proxy pool to select from
context: Request context with filtering criteria
Returns:
Selected proxy from filtered pool
Raises:
ProxyPoolEmptyError: If filters eliminate all proxies
Performance:
Target: <5ms total including all filters and selector (SC-007)
"""
from proxywhirl.exceptions import ProxyPoolEmptyError
if context is None:
from proxywhirl.models import SelectionContext
context = SelectionContext()
# Start with healthy proxies only
filtered_proxies = [p for p in pool.get_all_proxies() if p.is_healthy]
if not filtered_proxies:
raise ProxyPoolEmptyError("No healthy proxies available")
# Apply filters sequentially
for filter_strategy in self.filters:
# Create temporary pool with filtered proxies
temp_pool = pool.__class__(name=f"{pool.name}-filtered")
for proxy in filtered_proxies:
temp_pool.add_proxy(proxy)
# Apply filter
try:
selected = filter_strategy.select(temp_pool, context)
# If filter returned one proxy, use it to filter the set
# For geo-filtering, this means only proxies matching criteria remain
filtered_proxies = [
p for p in filtered_proxies if self._matches_filter(p, selected)
]
except ProxyPoolEmptyError:
# Filter eliminated all proxies
raise ProxyPoolEmptyError(
f"Filter {filter_strategy.__class__.__name__} eliminated all proxies"
)
if not filtered_proxies:
raise ProxyPoolEmptyError("All proxies filtered out")
# Apply selector to filtered pool
final_pool = pool.__class__(name=f"{pool.name}-final")
for proxy in filtered_proxies:
final_pool.add_proxy(proxy)
return self.selector.select(final_pool, context)
def _matches_filter(self, proxy: Proxy, filter_result: Proxy) -> bool:
"""
Check if proxy matches filter criteria based on filter result.
For now, we use a simple heuristic: if the filter strategy selected a proxy,
we check if it shares key attributes (country, region) with other proxies.
Args:
proxy: Proxy to check
filter_result: Proxy returned by filter strategy
Returns:
True if proxy matches filter criteria
"""
# If same proxy, definitely matches
if proxy.id == filter_result.id:
return True
# Check geo-location match
if hasattr(filter_result, "country_code") and filter_result.country_code:
return getattr(proxy, "country_code", None) == filter_result.country_code
# If no specific criteria, include all
return True
[docs]
def record_result(self, proxy: Proxy, success: bool, response_time_ms: float) -> None:
"""
Record result by delegating to selector strategy.
Args:
proxy: The proxy that handled the request
success: Whether the request succeeded
response_time_ms: Response time in milliseconds
"""
# Delegate to selector for result recording
self.selector.record_result(proxy, success, response_time_ms)
# Also update filters if they track results
for filter_strategy in self.filters:
if hasattr(filter_strategy, "record_result"):
filter_strategy.record_result(proxy, success, response_time_ms)
@classmethod
[docs]
def from_config(cls, config: dict[str, Any]) -> CompositeStrategy:
"""
Create CompositeStrategy from configuration dictionary.
Args:
config: Configuration dict with keys:
- filters: List of filter strategy names or instances
- selector: Selector strategy name or instance
Returns:
Configured CompositeStrategy instance
Example:
>>> config = {
... "filters": ["geo-targeted"],
... "selector": "performance-based"
... }
>>> strategy = CompositeStrategy.from_config(config)
Raises:
ValueError: If config is invalid
"""
filters_config = config.get("filters", [])
selector_config = config.get("selector", "round-robin")
# Convert filter names to instances
filters = []
for f in filters_config:
if isinstance(f, str):
# Will be implemented with StrategyRegistry in T070
filters.append(cls._strategy_from_name(f))
else:
filters.append(f)
# Convert selector name to instance
if isinstance(selector_config, str):
selector = cls._strategy_from_name(selector_config)
else:
selector = selector_config
return cls(filters=filters, selector=selector)
@staticmethod
def _strategy_from_name(name: str) -> RotationStrategy:
"""
Convert strategy name to strategy instance.
Args:
name: Strategy name (e.g., "round-robin", "performance-based")
Returns:
Strategy instance
Raises:
ValueError: If strategy name not recognized
"""
# Simple mapping for now - will use StrategyRegistry in T070
strategy_map: dict[str, type[RotationStrategy]] = {
"round-robin": RoundRobinStrategy,
"random": RandomStrategy,
"least-used": LeastUsedStrategy,
"performance-based": PerformanceBasedStrategy,
"session": SessionPersistenceStrategy,
"geo-targeted": GeoTargetedStrategy,
}
strategy_class = strategy_map.get(name)
if strategy_class is None:
raise ValueError(f"Unknown strategy name: {name}")
return strategy_class()