"""
Circuit breaker pattern implementation for proxy failure management.
IMPORTANT: This module provides SYNCHRONOUS circuit breaker implementation.
Usage Guidelines:
-----------------
- CircuitBreaker: Use in synchronous contexts only (threading.Lock)
- AsyncCircuitBreaker: Use in async contexts (from circuit_breaker_async module)
Example:
Synchronous usage:
>>> cb = CircuitBreaker(proxy_id="proxy-1")
>>> cb.record_failure() # Thread-safe
>>> if cb.should_attempt_request():
... # make request
... cb.record_success()
Async usage (use AsyncCircuitBreaker instead):
>>> from proxywhirl.circuit_breaker_async import AsyncCircuitBreaker
>>> cb = AsyncCircuitBreaker(proxy_id="proxy-1")
>>> await cb.record_failure() # Event loop safe
>>> if await cb.should_attempt_request():
... # make async request
... await cb.record_success()
"""
from __future__ import annotations
from threading import Lock
from typing import Any
from pydantic import PrivateAttr
from proxywhirl.circuit_breaker.base import CircuitBreakerBase, CircuitBreakerState
from proxywhirl.models import CircuitBreakerConfig
# Re-export for backward compatibility
__all__ = ["CircuitBreaker", "CircuitBreakerState"]
[docs]
[docs]
class CircuitBreaker(CircuitBreakerBase):
[docs]
"""Circuit breaker for a single proxy (SYNCHRONOUS implementation).
[docs]
WARNING: This class uses threading.Lock and is designed for SYNCHRONOUS contexts only.
[docs]
For async applications, use AsyncCircuitBreaker from circuit_breaker_async module.
Attributes:
proxy_id: Unique identifier for the proxy
state: Current circuit breaker state (CLOSED, OPEN, HALF_OPEN)
failure_count: Number of failures in current window
failure_threshold: Number of failures before opening circuit
window_duration: Rolling window duration in seconds
timeout_duration: How long circuit stays open before testing recovery
"""
_lock: Lock = PrivateAttr(default_factory=Lock)
[docs]
def record_failure(self) -> None:
"""Record a failure and update state if threshold reached.
Thread-safe - acquires threading.Lock for the entire operation.
"""
with self._lock:
self._do_record_failure()
[docs]
def record_success(self) -> None:
"""Record a success and potentially close circuit.
Thread-safe - acquires threading.Lock for the entire operation.
"""
with self._lock:
self._do_record_success()
[docs]
def should_attempt_request(self) -> bool:
"""Check if proxy is available for requests.
Thread-safe - acquires threading.Lock for the entire operation.
Returns:
True if proxy should be attempted, False if circuit is open.
"""
with self._lock:
return self._do_should_attempt_request()
[docs]
def reset(self) -> None:
"""Manually reset circuit breaker to CLOSED state.
Thread-safe - acquires threading.Lock before resetting state.
"""
with self._lock:
self._do_reset()
@classmethod
[docs]
def create(
cls,
proxy_id: str,
config: CircuitBreakerConfig | None = None,
**kwargs: Any,
) -> CircuitBreaker:
"""Factory method to create a circuit breaker with optional config.
Args:
proxy_id: Unique identifier for the proxy
config: CircuitBreakerConfig with settings
**kwargs: Additional CircuitBreaker field overrides
Returns:
CircuitBreaker instance
"""
if config:
failure_threshold = kwargs.pop("failure_threshold", config.failure_threshold)
window_duration = kwargs.pop("window_duration", config.window_duration)
timeout_duration = kwargs.pop("timeout_duration", config.timeout_duration)
return cls(
proxy_id=proxy_id,
failure_threshold=failure_threshold,
window_duration=window_duration,
timeout_duration=timeout_duration,
**kwargs,
)
return cls(proxy_id=proxy_id, **kwargs)
@classmethod
[docs]
def from_config(
cls,
proxy_id: str,
config: CircuitBreakerConfig | None = None,
**kwargs: Any,
) -> CircuitBreaker:
"""Backward-compatible alias for create()."""
return cls.create(proxy_id=proxy_id, config=config, **kwargs)