Source code for proxywhirl.retry.metrics

"""
Retry metrics collection and aggregation.

Thread Safety:
    RetryMetrics uses threading.Lock for thread-safe operation in synchronous contexts.
    When used in async contexts (e.g., FastAPI endpoints), methods that acquire locks
    should be called using asyncio.to_thread() to prevent blocking the event loop.

Usage:
    Sync context (RetryExecutor, aggregation thread):
        metrics.record_attempt(attempt)
        metrics.aggregate_hourly()

    Async context (FastAPI endpoints):
        summary = await asyncio.to_thread(metrics.get_summary)
        timeseries = await asyncio.to_thread(metrics.get_timeseries, hours=24)
"""

from __future__ import annotations

from collections import defaultdict, deque
from datetime import datetime, timedelta, timezone
from enum import Enum
from threading import Lock
from typing import Any

from pydantic import BaseModel, Field, PrivateAttr

from proxywhirl.circuit_breaker import CircuitBreakerState


[docs] class RetryOutcome(str, Enum): """Outcome of a retry attempt.""" SUCCESS = "success" FAILURE = "failure" TIMEOUT = "timeout" CIRCUIT_OPEN = "circuit_open"
[docs] class RetryAttempt(BaseModel): """Record of a single retry attempt.""" request_id: str attempt_number: int = Field(ge=0) proxy_id: str timestamp: datetime outcome: RetryOutcome status_code: int | None = None delay_before: float = Field(ge=0) latency: float = Field(ge=0) error_message: str | None = None
[docs] class CircuitBreakerEvent(BaseModel): """Circuit breaker state change event.""" proxy_id: str from_state: CircuitBreakerState to_state: CircuitBreakerState timestamp: datetime failure_count: int
[docs] class HourlyAggregate(BaseModel): """Hourly aggregated metrics.""" hour: datetime # Truncated to hour total_requests: int = 0 total_retries: int = 0 success_by_attempt: dict[int, int] = Field(default_factory=dict) failure_by_reason: dict[str, int] = Field(default_factory=dict) avg_latency: float = 0.0
[docs] class RetryMetrics(BaseModel): """Aggregated retry metrics collection.""" # Current period (last hour, raw data) current_attempts: deque[RetryAttempt] = Field(default_factory=deque) # Historical data (last 24 hours, aggregated) hourly_aggregates: dict[datetime, HourlyAggregate] = Field(default_factory=dict) # Circuit breaker events circuit_breaker_events: list[CircuitBreakerEvent] = Field(default_factory=list) # Configuration retention_hours: int = Field(default=24) max_current_attempts: int = Field(default=10000) # Runtime (not serialized) _lock: Lock = PrivateAttr(default_factory=Lock) class Config: arbitrary_types_allowed = True def __init__(self, **data): """Initialize with maxlen-bounded deque.""" super().__init__(**data) # Replace unbounded deque with maxlen-bounded one max_attempts = data.get("max_current_attempts", 10000) self.current_attempts = deque(self.current_attempts, maxlen=max_attempts)
[docs] def record_attempt(self, attempt: RetryAttempt) -> None: """Record a retry attempt.""" with self._lock: self.current_attempts.append(attempt)
# Auto-evicts oldest item when maxlen is reached
[docs] def record_circuit_breaker_event(self, event: CircuitBreakerEvent) -> None: """Record circuit breaker state change.""" with self._lock: self.circuit_breaker_events.append(event) # Keep last 1000 events if len(self.circuit_breaker_events) > 1000: self.circuit_breaker_events = self.circuit_breaker_events[-1000:]
[docs] def aggregate_hourly(self) -> None: """Aggregate current_attempts into hourly summaries.""" with self._lock: now = datetime.now(timezone.utc) cutoff = now - timedelta(hours=self.retention_hours) # Group attempts by hour attempts_by_hour: dict[datetime, list[RetryAttempt]] = defaultdict(list) for attempt in self.current_attempts: if attempt.timestamp >= cutoff: hour = attempt.timestamp.replace(minute=0, second=0, microsecond=0) attempts_by_hour[hour].append(attempt) # Create/update hourly aggregates for hour, attempts in attempts_by_hour.items(): if hour not in self.hourly_aggregates: self.hourly_aggregates[hour] = HourlyAggregate(hour=hour) agg = self.hourly_aggregates[hour] agg.total_requests += len({a.request_id for a in attempts}) agg.total_retries += len(attempts) for attempt in attempts: if attempt.outcome == RetryOutcome.SUCCESS: agg.success_by_attempt[attempt.attempt_number] = ( agg.success_by_attempt.get(attempt.attempt_number, 0) + 1 ) else: reason = attempt.error_message or attempt.outcome.value agg.failure_by_reason[reason] = agg.failure_by_reason.get(reason, 0) + 1 # Remove old aggregates self.hourly_aggregates = { h: agg for h, agg in self.hourly_aggregates.items() if h >= cutoff }
[docs] def get_summary(self) -> dict[str, Any]: """Get metrics summary for API response.""" with self._lock: total_retries = len(self.current_attempts) + sum( agg.total_retries for agg in self.hourly_aggregates.values() ) success_by_attempt: dict[int, int] = defaultdict(int) for agg in self.hourly_aggregates.values(): for attempt_num, count in agg.success_by_attempt.items(): success_by_attempt[attempt_num] += count return { "total_retries": total_retries, "success_by_attempt": dict(success_by_attempt), "circuit_breaker_events_count": len(self.circuit_breaker_events), "retention_hours": self.retention_hours, }
[docs] def get_timeseries(self, hours: int = 24) -> list[dict[str, Any]]: """Get time-series data for the specified hours.""" with self._lock: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) data_points = [] for hour, agg in sorted(self.hourly_aggregates.items()): if hour >= cutoff: success_count = sum(agg.success_by_attempt.values()) total_attempts = agg.total_retries success_rate = success_count / total_attempts if total_attempts > 0 else 0.0 data_points.append( { "timestamp": hour.isoformat(), "total_requests": agg.total_requests, "total_retries": agg.total_retries, "success_rate": success_rate, "avg_latency": agg.avg_latency, } ) return data_points
[docs] def get_by_proxy(self, hours: int = 24) -> dict[str, dict[str, Any]]: """Get per-proxy retry statistics.""" with self._lock: cutoff = datetime.now(timezone.utc) - timedelta(hours=hours) proxy_stats: dict[str, dict[str, Any]] = defaultdict( lambda: { "total_attempts": 0, "success_count": 0, "failure_count": 0, "total_latency": 0.0, "circuit_breaker_opens": 0, } ) # Aggregate from current attempts for attempt in self.current_attempts: if attempt.timestamp >= cutoff: stats = proxy_stats[attempt.proxy_id] stats["total_attempts"] += 1 stats["total_latency"] += attempt.latency if attempt.outcome == RetryOutcome.SUCCESS: stats["success_count"] += 1 else: stats["failure_count"] += 1 # Count circuit breaker opens for event in self.circuit_breaker_events: if event.timestamp >= cutoff and event.to_state == CircuitBreakerState.OPEN: proxy_stats[event.proxy_id]["circuit_breaker_opens"] += 1 # Calculate average latency result = {} for proxy_id, stats in proxy_stats.items(): avg_latency = ( stats["total_latency"] / stats["total_attempts"] if stats["total_attempts"] > 0 else 0.0 ) result[proxy_id] = { "proxy_id": proxy_id, "total_attempts": stats["total_attempts"], "success_count": stats["success_count"], "failure_count": stats["failure_count"], "avg_latency": avg_latency, "circuit_breaker_opens": stats["circuit_breaker_opens"], } return result