Source code for proxywhirl.metrics_collector

"""Prometheus metrics collector for ProxyWhirl.

This module provides centralized Prometheus metrics collection for monitoring
proxy pool health, request performance, and circuit breaker states.

Metrics exposed:
- proxywhirl_requests_total: Counter for total requests
- proxywhirl_request_duration_seconds: Histogram for request latency
- proxywhirl_proxy_health_status: Gauge for proxy health (1=healthy, 0=unhealthy)
- proxywhirl_active_proxies: Gauge for number of active proxies
- proxywhirl_circuit_breaker_state: Gauge for circuit breaker states

Usage:
    from proxywhirl.metrics_collector import MetricsCollector

    collector = MetricsCollector()
    collector.record_request("success", duration=1.5, proxy_id="proxy-1")
    collector.update_proxy_health("proxy-1", is_healthy=True)
"""

from __future__ import annotations

from typing import Literal

from prometheus_client import REGISTRY, CollectorRegistry, Counter, Gauge, Histogram


[docs] class MetricsCollector: """Centralized Prometheus metrics collector for ProxyWhirl. This class provides a clean API for recording metrics throughout the application. All metrics follow Prometheus naming conventions and best practices. """ def __init__(self, registry: CollectorRegistry | None = None) -> None: """Initialize Prometheus metrics collectors. Args: registry: Optional custom registry. If None, uses the default REGISTRY. This allows for isolated testing without metric collisions. """ self._registry = registry or REGISTRY # Counter: Total requests made through proxies self.requests_total = Counter( "proxywhirl_requests_total", "Total number of requests made through proxies", ["status", "proxy_id"], registry=self._registry, ) # Histogram: Request latency distribution self.request_duration_seconds = Histogram( "proxywhirl_request_duration_seconds", "Request duration in seconds", ["proxy_id"], buckets=(0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0), registry=self._registry, ) # Gauge: Proxy health status (1=healthy, 0=unhealthy) self.proxy_health_status = Gauge( "proxywhirl_proxy_health_status", "Current proxy health status (1=healthy, 0=unhealthy)", ["proxy_id"], registry=self._registry, ) # Gauge: Number of active proxies in the pool self.active_proxies = Gauge( "proxywhirl_active_proxies", "Number of active proxies in the pool", registry=self._registry, ) # Gauge: Circuit breaker state (0=closed, 1=open, 2=half-open) self.circuit_breaker_state = Gauge( "proxywhirl_circuit_breaker_state", "Circuit breaker state (0=closed, 1=open, 2=half-open)", ["proxy_id"], registry=self._registry, )
[docs] def record_request( self, status: Literal["success", "error", "timeout"], duration: float, proxy_id: str, ) -> None: """Record a request made through a proxy. Args: status: Request status (success, error, timeout) duration: Request duration in seconds proxy_id: Identifier of the proxy used """ # Increment request counter self.requests_total.labels(status=status, proxy_id=proxy_id).inc() # Record request duration self.request_duration_seconds.labels(proxy_id=proxy_id).observe(duration)
[docs] def update_proxy_health(self, proxy_id: str, is_healthy: bool) -> None: """Update health status for a specific proxy. Args: proxy_id: Identifier of the proxy is_healthy: True if proxy is healthy, False otherwise """ self.proxy_health_status.labels(proxy_id=proxy_id).set(1 if is_healthy else 0)
[docs] def update_active_proxies(self, count: int) -> None: """Update the count of active proxies in the pool. Args: count: Number of active proxies """ self.active_proxies.set(count)
[docs] def update_circuit_breaker_state( self, proxy_id: str, state: Literal["closed", "open", "half-open"], ) -> None: """Update circuit breaker state for a specific proxy. Args: proxy_id: Identifier of the proxy state: Circuit breaker state (closed=0, open=1, half-open=2) """ state_value = {"closed": 0, "open": 1, "half-open": 2}[state] self.circuit_breaker_state.labels(proxy_id=proxy_id).set(state_value)
[docs] def clear_proxy_metrics(self, proxy_id: str) -> None: """Clear all metrics for a specific proxy. This should be called when a proxy is removed from the pool to prevent stale metrics from persisting. Args: proxy_id: Identifier of the proxy to clear """ # Note: Prometheus client doesn't support removing specific label combinations # Set health to 0 and circuit breaker to closed as a workaround self.proxy_health_status.labels(proxy_id=proxy_id).set(0) self.circuit_breaker_state.labels(proxy_id=proxy_id).set(0)
# Global singleton instance for convenient access _collector: MetricsCollector | None = None _singleton_registry: CollectorRegistry | None = None
[docs] def get_metrics_collector() -> MetricsCollector: """Get the global MetricsCollector singleton. Returns: MetricsCollector: The global metrics collector instance """ global _collector, _singleton_registry if _collector is None: # Use a dedicated registry for the singleton to avoid # conflicts with the default REGISTRY during testing if _singleton_registry is None: _singleton_registry = CollectorRegistry() _collector = MetricsCollector(registry=_singleton_registry) return _collector
[docs] def reset_metrics_collector() -> None: """Reset the global MetricsCollector singleton. This is primarily used for testing to ensure test isolation. Calling this function clears the global collector reference, allowing a new instance to be created on next access. """ global _collector, _singleton_registry _collector = None _singleton_registry = None
# Convenience functions for direct metric access
[docs] def record_request( status: Literal["success", "error", "timeout"], duration: float, proxy_id: str, ) -> None: """Record a request made through a proxy. Convenience function that uses the global collector. Args: status: Request status (success, error, timeout) duration: Request duration in seconds proxy_id: Identifier of the proxy used """ get_metrics_collector().record_request(status, duration, proxy_id)
[docs] def update_proxy_health(proxy_id: str, is_healthy: bool) -> None: """Update health status for a specific proxy. Convenience function that uses the global collector. Args: proxy_id: Identifier of the proxy is_healthy: True if proxy is healthy, False otherwise """ get_metrics_collector().update_proxy_health(proxy_id, is_healthy)
[docs] def update_active_proxies(count: int) -> None: """Update the count of active proxies in the pool. Convenience function that uses the global collector. Args: count: Number of active proxies """ get_metrics_collector().update_active_proxies(count)
[docs] def update_circuit_breaker_state( proxy_id: str, state: Literal["closed", "open", "half-open"], ) -> None: """Update circuit breaker state for a specific proxy. Convenience function that uses the global collector. Args: proxy_id: Identifier of the proxy state: Circuit breaker state (closed=0, open=1, half-open=2) """ get_metrics_collector().update_circuit_breaker_state(proxy_id, state)
[docs] def clear_proxy_metrics(proxy_id: str) -> None: """Clear all metrics for a specific proxy. Convenience function that uses the global collector. Args: proxy_id: Identifier of the proxy to clear """ get_metrics_collector().clear_proxy_metrics(proxy_id)