"""
Async circuit breaker implementation with RWLock for reduced contention.
This is the async-first implementation that uses AsyncRWLock for
better performance in high-concurrency scenarios.
"""
from __future__ import annotations
from typing import Any
from pydantic import PrivateAttr
from proxywhirl.circuit_breaker.base import CircuitBreakerBase, CircuitBreakerState
from proxywhirl.models import CircuitBreakerConfig
from proxywhirl.rwlock import AsyncRWLock
# Re-export for backward compatibility
__all__ = ["AsyncCircuitBreaker", "CircuitBreakerState"]
[docs]
class AsyncCircuitBreaker(CircuitBreakerBase):
"""Async circuit breaker for a single proxy with RWLock for high concurrency.
This is the ASYNCHRONOUS implementation designed for async/await contexts.
Uses AsyncRWLock for event-loop-safe operations with reduced lock contention.
Key Features:
- Event-loop safe: All methods are async and use asyncio-compatible locks
- High concurrency: RWLock allows multiple readers or single writer
- Zero blocking: Never blocks the event loop with synchronous operations
For synchronous contexts, use CircuitBreaker from circuit_breaker module instead.
Attributes:
proxy_id: Unique identifier for the proxy
state: Current circuit breaker state (CLOSED, OPEN, HALF_OPEN)
failure_count: Number of failures in current window
failure_threshold: Number of failures before opening circuit
window_duration: Rolling window duration in seconds
[docs]
timeout_duration: How long circuit stays open before testing recovery
>>> from proxywhirl.circuit_breaker_async import AsyncCircuitBreaker
[docs]
>>> cb = AsyncCircuitBreaker(proxy_id="proxy-1")
[docs]
>>> await cb.record_failure() # Event-loop safe
[docs]
>>> if await cb.should_attempt_request():
... # make async request
... await cb.record_success()
"""
_lock: AsyncRWLock = PrivateAttr(default_factory=AsyncRWLock)
[docs]
async def record_failure(self) -> None:
"""Record a failure and update state if threshold reached."""
async with self._lock.write_lock():
self._do_record_failure()
[docs]
async def record_success(self) -> None:
"""Record a success and potentially close circuit."""
async with self._lock.write_lock():
self._do_record_success()
[docs]
async def should_attempt_request(self) -> bool:
"""Check if proxy is available for requests.
Uses write lock to prevent TOCTOU race conditions.
Returns:
True if proxy should be attempted, False if circuit is open.
"""
async with self._lock.write_lock():
return self._do_should_attempt_request()
[docs]
async def reset(self) -> None:
"""Manually reset circuit breaker to CLOSED state."""
async with self._lock.write_lock():
self._do_reset()
@classmethod
[docs]
def create(
cls,
proxy_id: str,
config: CircuitBreakerConfig | None = None,
**kwargs: Any,
) -> AsyncCircuitBreaker:
"""Factory method to create a circuit breaker with optional config.
Args:
proxy_id: Unique identifier for the proxy
config: CircuitBreakerConfig with settings
**kwargs: Additional AsyncCircuitBreaker field overrides
Returns:
AsyncCircuitBreaker instance
"""
if config:
failure_threshold = kwargs.pop("failure_threshold", config.failure_threshold)
window_duration = kwargs.pop("window_duration", config.window_duration)
timeout_duration = kwargs.pop("timeout_duration", config.timeout_duration)
return cls(
proxy_id=proxy_id,
failure_threshold=failure_threshold,
window_duration=window_duration,
timeout_duration=timeout_duration,
**kwargs,
)
return cls(proxy_id=proxy_id, **kwargs)
@classmethod
[docs]
def from_config(
cls,
proxy_id: str,
config: CircuitBreakerConfig | None = None,
**kwargs: Any,
) -> AsyncCircuitBreaker:
"""Backward-compatible alias for create()."""
return cls.create(proxy_id=proxy_id, config=config, **kwargs)