Source code for proxywhirl.mcp.server

"""MCP Server implementation for ProxyWhirl.

Production-ready MCP server with unified tool interface, auto-loading from database,
proper lifecycle management, FastMCP v2 Context support, and middleware-based auth.
"""

from __future__ import annotations

import asyncio
import sys
from contextlib import asynccontextmanager
from typing import Any, Literal

from loguru import logger

from proxywhirl.mcp.auth import MCPAuth
from proxywhirl.models import HealthStatus, Proxy, ProxySource
from proxywhirl.rotator import AsyncProxyWhirl


def _check_python_version() -> None:
    """Check that Python version is 3.10+ for MCP server functionality.

    Raises:
        RuntimeError: If Python version is below 3.10
    """
    if sys.version_info < (3, 10):
        raise RuntimeError(
            f"MCP server requires Python 3.10 or higher. "
            f"Current version: {sys.version_info.major}.{sys.version_info.minor}. "
            f"Please upgrade Python or use proxywhirl without MCP functionality."
        )


# Import FastMCP with graceful fallback and Python version check
try:
    _check_python_version()
    from fastmcp import Context, FastMCP
    from fastmcp.server.middleware import Middleware, MiddlewareContext

    _FASTMCP_AVAILABLE = True
except RuntimeError as e:
    logger.warning(str(e))
    _FASTMCP_AVAILABLE = False
    Context = None  # type: ignore[assignment, misc]
    Middleware = None  # type: ignore[assignment, misc]
    MiddlewareContext = None  # type: ignore[assignment, misc]
except ImportError:
    logger.warning(
        "FastMCP is not installed. MCP server functionality will be limited. "
        "Install with: pip install fastmcp (Python 3.10+ required)"
    )
    _FASTMCP_AVAILABLE = False
    Context = None  # type: ignore[assignment, misc]
    Middleware = None  # type: ignore[assignment, misc]
    MiddlewareContext = None  # type: ignore[assignment, misc]

# Global AsyncProxyWhirl instance for MCP server
# This will be lazily initialized on first use (thread-safe)
_rotator_lock = asyncio.Lock()
_rotator: AsyncProxyWhirl | None = None

# Global MCPAuth instance for authentication
# This can be configured via set_auth() or will use default (no auth)
_auth: MCPAuth | None = None


# ============================================================================
# Auth Middleware (FastMCP v2)
# ============================================================================


def _create_auth_middleware_class():
    """Create ProxyWhirlAuthMiddleware class inheriting from Middleware if available."""
    if not _FASTMCP_AVAILABLE or Middleware is None:
        return None

    class ProxyWhirlAuthMiddleware(Middleware):
        """FastMCP v2 middleware for API key authentication.

        This middleware intercepts all tool calls and validates the API key
        before allowing the request to proceed. This centralizes auth logic
        instead of checking in each tool implementation.
        """

        async def on_call_tool(self, context: Any, call_next: Any) -> Any:
            """Validate API key on tool calls.

            Extracts api_key from tool arguments and validates against configured auth.
            """
            auth = get_auth()

            # Extract api_key from tool arguments if present
            api_key = None
            if hasattr(context, "message") and hasattr(context.message, "params"):
                params = context.message.params
                if hasattr(params, "arguments") and params.arguments:
                    api_key = params.arguments.get("api_key")

            # Validate authentication
            if not auth.authenticate({"api_key": api_key}):
                # Return error response for failed auth
                return {"error": "Authentication failed: Invalid API key", "code": 401}

            return await call_next(context)

    return ProxyWhirlAuthMiddleware


# Create middleware class and instance (will be added to mcp if FastMCP available)
ProxyWhirlAuthMiddleware = _create_auth_middleware_class()
_auth_middleware = ProxyWhirlAuthMiddleware() if ProxyWhirlAuthMiddleware else None


# ============================================================================
# Lifespan Management (FastMCP v2)
# ============================================================================


@asynccontextmanager
async def _mcp_lifespan(server: Any):
    """FastMCP v2 lifespan context manager.

    Handles proper startup and shutdown lifecycle:
    - Startup: Initialize rotator, load proxies from DB
    - Shutdown: Clean up resources, close HTTP clients

    Args:
        server: FastMCP server instance (passed by FastMCP)

    Yields:
        None
    """
    logger.info("MCP server starting (lifespan)")
    try:
        # Pre-initialize the rotator during startup
        await get_rotator()
        logger.info("MCP server ready")
        yield
    finally:
        await cleanup_rotator()
        logger.info("MCP server shutdown complete (lifespan)")


# Create FastMCP instance with lifespan (if available)
if _FASTMCP_AVAILABLE:
    mcp = FastMCP("ProxyWhirl", lifespan=_mcp_lifespan)
    # Add auth middleware
    if _auth_middleware is not None:
        mcp.add_middleware(_auth_middleware)
else:
    mcp = None  # type: ignore[assignment]


[docs] async def get_rotator() -> AsyncProxyWhirl: """Get or create the global AsyncProxyWhirl instance with auto-loading. On first initialization: 1. If database exists (from PROXYWHIRL_MCP_DB env or proxywhirl.db), loads proxies 2. If pool is still empty, auto-fetches proxies from public sources Returns: AsyncProxyWhirl instance """ import os global _rotator async with _rotator_lock: if _rotator is None: _rotator = AsyncProxyWhirl() # Auto-load from database if it exists from pathlib import Path db_path_str = os.environ.get("PROXYWHIRL_MCP_DB", "proxywhirl.db") db_path = Path(db_path_str) if db_path.exists(): try: from proxywhirl.storage import SQLiteStorage storage = SQLiteStorage(str(db_path)) await storage.initialize() proxy_dicts = await storage.load() # Convert dicts to Proxy objects (normalized schema returns dicts) for proxy_dict in proxy_dicts: proxy = Proxy( url=proxy_dict["url"], protocol=proxy_dict.get("protocol", "http"), health_status=HealthStatus(proxy_dict.get("health_status", "unknown")), country_code=proxy_dict.get("country_code"), source=ProxySource(proxy_dict.get("source", "fetched")), ) await _rotator.add_proxy(proxy) await storage.close() logger.info(f"MCP: Auto-loaded {len(proxy_dicts)} proxies from {db_path}") except Exception as e: logger.warning(f"MCP: Failed to auto-load proxies from {db_path}: {e}") else: logger.debug(f"MCP: No database at {db_path}, starting with empty pool") # Auto-fetch if pool is still empty if _rotator.pool.size == 0: logger.info( "MCP: Pool is empty, auto-fetching proxies from built-in public sources..." ) await _auto_fetch_proxies(_rotator) logger.info( f"Initialized global AsyncProxyWhirl for MCP server with {_rotator.pool.size} proxies" ) return _rotator
async def _auto_fetch_proxies(rotator: AsyncProxyWhirl, max_proxies: int = 100) -> None: """Auto-fetch proxies from built-in public sources for cold start. Args: rotator: AsyncProxyWhirl to populate max_proxies: Maximum number of proxies to fetch """ from proxywhirl.models import BootstrapConfig from proxywhirl.rotator._bootstrap import _fetch_bootstrap_candidates config = BootstrapConfig(show_progress=False) try: candidates = await _fetch_bootstrap_candidates(config=config) candidates = candidates[:max_proxies] except Exception as e: logger.warning(f"MCP: Failed to auto-fetch from built-in public sources: {e}") return fetched_count = 0 for proxy in candidates: try: await rotator.add_proxy(proxy) fetched_count += 1 except Exception as e: logger.debug("MCP: Skipping auto-fetched proxy during pool load", error=str(e)) logger.info(f"MCP: Auto-fetched {fetched_count} proxies from built-in public sources")
[docs] async def set_rotator(rotator: AsyncProxyWhirl) -> None: """Set the global AsyncProxyWhirl instance (thread-safe). Args: rotator: AsyncProxyWhirl instance to use """ global _rotator async with _rotator_lock: _rotator = rotator logger.info("Set custom AsyncProxyWhirl for MCP server")
[docs] async def cleanup_rotator() -> None: """Clean up global rotator resources. Closes HTTP clients and releases all resources held by the rotator. Safe to call multiple times. """ global _rotator async with _rotator_lock: if _rotator is not None: # Close all pooled clients await _rotator._close_all_clients() # Stop aggregation thread _rotator._stop_event.set() if _rotator._aggregation_thread and _rotator._aggregation_thread.is_alive(): _rotator._aggregation_thread.join(timeout=5.0) _rotator = None logger.info("MCP: Cleaned up AsyncProxyWhirl")
@asynccontextmanager
[docs] async def mcp_lifespan(): """Lifespan context manager for MCP server (legacy wrapper). Note: FastMCP v2 uses the lifespan passed to FastMCP() constructor. This is kept for backward compatibility with direct usage. Yields: None """ async with _mcp_lifespan(None): yield
[docs] def get_auth() -> MCPAuth: """Get or create the global MCPAuth instance. Returns: MCPAuth instance """ global _auth if _auth is None: _auth = MCPAuth() logger.info("Initialized global MCPAuth for MCP server (no auth required)") return _auth
[docs] def set_auth(auth: MCPAuth | None) -> None: """Set the global MCPAuth instance. Args: auth: MCPAuth instance to use, or None to disable authentication """ global _auth _auth = auth if auth is None: logger.info("Disabled authentication for MCP server") else: logger.info("Set custom MCPAuth for MCP server")
[docs] class ProxyWhirlMCPServer: """ProxyWhirl MCP Server using FastMCP. This server exposes proxy management functionality to AI assistants via the Model Context Protocol using FastMCP's decorator-based API. Example:: # Basic usage server = ProxyWhirlMCPServer() server.run() # With custom rotator rotator = AsyncProxyWhirl() server = ProxyWhirlMCPServer(proxy_manager=rotator) await server.initialize() # Must call for async setup server.run() """ def __init__(self, proxy_manager: Any = None) -> None: """Initialize ProxyWhirl MCP server. Note: If providing a proxy_manager, you must call initialize() to set it up. Args: proxy_manager: ProxyWhirl proxy manager instance (AsyncProxyWhirl) """ self.proxy_manager = proxy_manager logger.info("ProxyWhirl MCP Server initialized")
[docs] async def initialize(self) -> None: """Initialize the server asynchronously. Call this method after construction if you provided a custom proxy_manager. This properly sets up the rotator in an async context. """ if self.proxy_manager is not None: await set_rotator(self.proxy_manager) logger.info("ProxyWhirl MCP Server async initialization complete")
[docs] def run(self, transport: Literal["stdio", "http", "sse", "streamable-http"] = "stdio") -> None: """Run the MCP server. Args: transport: Transport type ('stdio', 'http', 'sse', or 'streamable-http') """ logger.info(f"Starting ProxyWhirl MCP Server with {transport} transport") if mcp is None: logger.warning( "FastMCP is not installed. MCP server cannot run. Install with: pip install fastmcp" ) return mcp.run(transport=transport)
[docs] def main() -> None: """CLI entry point for running the MCP server. This enables `uvx proxywhirl[mcp]` or `proxywhirl-mcp` to start the server. Environment variables: PROXYWHIRL_MCP_API_KEY: API key for authentication PROXYWHIRL_MCP_DB: Path to proxy database file PROXYWHIRL_MCP_LOG_LEVEL: Log level (debug, info, warning, error) """ import argparse import os parser = argparse.ArgumentParser( description="ProxyWhirl MCP Server", formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Environment variables: PROXYWHIRL_MCP_API_KEY API key for authentication PROXYWHIRL_MCP_DB Path to proxy database file PROXYWHIRL_MCP_LOG_LEVEL Log level (debug, info, warning, error) Examples: proxywhirl-mcp # Start with stdio transport proxywhirl-mcp --transport http # Start with HTTP transport proxywhirl-mcp --api-key secret123 # Require authentication proxywhirl-mcp --db /path/to/proxies.db # Use custom database """, ) parser.add_argument( "--transport", choices=["stdio", "http", "sse", "streamable-http"], default="stdio", help="Transport type (default: stdio)", ) parser.add_argument( "--api-key", default=os.environ.get("PROXYWHIRL_MCP_API_KEY"), help="API key for authentication (or set PROXYWHIRL_MCP_API_KEY)", ) parser.add_argument( "--db", default=os.environ.get("PROXYWHIRL_MCP_DB", "proxywhirl.db"), help="Path to proxy database file (default: proxywhirl.db)", ) parser.add_argument( "--log-level", choices=["debug", "info", "warning", "error"], default=os.environ.get("PROXYWHIRL_MCP_LOG_LEVEL", "info"), help="Log level (default: info)", ) args = parser.parse_args() # Configure logging logger.remove() logger.add( lambda msg: print(msg, end="", file=__import__("sys").stderr), level=args.log_level.upper(), format="<level>{level}</level> | {message}", ) # Set up authentication if API key provided if args.api_key: auth = MCPAuth(api_key=args.api_key) set_auth(auth) logger.info("Authentication enabled") # Store DB path for get_rotator to use os.environ["PROXYWHIRL_MCP_DB"] = args.db server = ProxyWhirlMCPServer() server.run(transport=args.transport)
# ============================================================================ # Internal Implementation Functions # ============================================================================ async def _list_proxies_impl(ctx: Any = None) -> dict[str, Any]: """List all proxies in the pool. Args: ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Proxy list and pool statistics. """ rotator = await get_rotator() # Get all proxies from the pool (thread-safe snapshot) proxies = [] for proxy in rotator.pool.get_all_proxies(): proxies.append( { "id": str(proxy.id), "url": str(proxy.url), "status": proxy.health_status.value, "success_rate": proxy.success_rate, "avg_latency_ms": proxy.average_response_time_ms or 0.0, "region": proxy.country_code or "UNKNOWN", "total_requests": proxy.total_requests, "total_successes": proxy.total_successes, "total_failures": proxy.total_failures, } ) # Count by status (use same snapshot for consistency) proxies_snapshot = rotator.pool.get_all_proxies() status_counts = { "healthy": sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.HEALTHY), "degraded": sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.DEGRADED), "unhealthy": sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.UNHEALTHY), "dead": sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.DEAD), "unknown": sum(1 for p in proxies_snapshot if p.health_status == HealthStatus.UNKNOWN), } return { "proxies": proxies, "total": rotator.pool.size, **status_counts, } async def _rotate_proxy_impl(ctx: Any = None) -> dict[str, Any]: """Rotate to the next proxy in the pool. Args: ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Selected proxy information. """ rotator = await get_rotator() try: # Select a proxy using the current strategy proxy = rotator.strategy.select(rotator.pool) return { "proxy": { "id": str(proxy.id), "url": str(proxy.url), "status": proxy.health_status.value, "success_rate": proxy.success_rate, "avg_latency_ms": proxy.average_response_time_ms or 0.0, "region": proxy.country_code or "UNKNOWN", } } except Exception as e: logger.error(f"Failed to rotate proxy: {e}") return {"error": f"Failed to select proxy: {str(e)}", "code": 500} async def _proxy_status_impl(proxy_id: str, ctx: Any = None) -> dict[str, Any]: """Get detailed status for a specific proxy. Args: proxy_id: UUID of the proxy to check ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Proxy status and metrics. """ if not proxy_id: return {"error": "proxy_id is required", "code": 400} rotator = await get_rotator() # Find the proxy by ID from uuid import UUID try: proxy_uuid = UUID(proxy_id) except ValueError: return {"error": f"Invalid proxy_id format: {proxy_id}", "code": 400} # Use thread-safe snapshot to find proxy proxy = None for p in rotator.pool.get_all_proxies(): if p.id == proxy_uuid: proxy = p break if proxy is None: return {"error": f"Proxy not found: {proxy_id}", "code": 404} # Get circuit breaker state circuit_breaker = rotator.circuit_breakers.get(proxy_id) cb_state = "unknown" if circuit_breaker: cb_state = circuit_breaker.state.value return { "proxy_id": proxy_id, "url": str(proxy.url), "status": proxy.health_status.value, "metrics": { "success_rate": proxy.success_rate, "avg_latency_ms": proxy.average_response_time_ms or 0.0, "total_requests": proxy.total_requests, "successful_requests": proxy.total_successes, "failed_requests": proxy.total_failures, "ema_response_time_ms": proxy.ema_response_time_ms or 0.0, }, "health": { "last_check": proxy.last_health_check.isoformat() if proxy.last_health_check else None, "last_success": proxy.last_success_at.isoformat() if proxy.last_success_at else None, "last_failure": proxy.last_failure_at.isoformat() if proxy.last_failure_at else None, "circuit_breaker": cb_state, "consecutive_failures": proxy.consecutive_failures, "consecutive_successes": proxy.consecutive_successes, }, "region": proxy.country_code or "UNKNOWN", "protocol": proxy.protocol, } async def _recommend_proxy_impl( region: str | None = None, performance: str | None = "medium", ctx: Any = None, ) -> dict[str, Any]: """Recommend the best proxy based on criteria. Args: region: Optional region filter (country code) performance: Performance tier (high, medium, low) ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Recommendation and alternatives. """ if performance and performance not in ["high", "medium", "low"]: return { "error": "Invalid performance level. Must be one of: high, medium, low", "code": 400, } rotator = await get_rotator() # Filter proxies by region if specified (thread-safe snapshot) candidates = rotator.pool.get_all_proxies() if region: region_upper = region.upper() candidates = [ p for p in candidates if p.country_code and p.country_code.upper() == region_upper ] if not candidates: return {"error": f"No proxies found for region: {region}", "code": 404} # Sort by performance criteria if performance == "high": # High performance: prioritize low latency and high success rate candidates.sort( key=lambda p: (-(p.success_rate or 0.0), p.average_response_time_ms or float("inf")) ) elif performance == "medium": # Medium performance: balance between success rate and latency candidates.sort( key=lambda p: ( -(p.success_rate or 0.0) * 0.7 - (1.0 / max(p.average_response_time_ms or 1000, 1)) * 0.3, ) ) else: # low # Low performance: just ensure it's working candidates.sort(key=lambda p: -(p.success_rate or 0.0)) if not candidates: return {"error": "No proxies available", "code": 404} best_proxy = candidates[0] # Calculate a score (0-1) based on success rate and response time score = best_proxy.success_rate or 0.0 if best_proxy.average_response_time_ms and best_proxy.average_response_time_ms > 0: # Penalize high latency (normalize to 0-1, where 100ms = 1.0, 1000ms = 0.1) latency_score = max(0.0, 1.0 - (best_proxy.average_response_time_ms / 1000.0)) score = (score + latency_score) / 2.0 reason_parts = [f"Best {performance} performance proxy"] if region: reason_parts.append(f"in {region.upper()} region") if best_proxy.success_rate and best_proxy.success_rate > 0.9: reason_parts.append("with high reliability") # Get alternatives (top 3 excluding the best) alternatives = [] for alt_proxy in candidates[1:4]: alt_score = alt_proxy.success_rate or 0.0 if alt_proxy.average_response_time_ms and alt_proxy.average_response_time_ms > 0: alt_latency_score = max(0.0, 1.0 - (alt_proxy.average_response_time_ms / 1000.0)) alt_score = (alt_score + alt_latency_score) / 2.0 alternatives.append( { "id": str(alt_proxy.id), "url": str(alt_proxy.url), "score": round(alt_score, 2), "reason": f"Alternative with {alt_proxy.success_rate:.1%} success rate", } ) return { "recommendation": { "id": str(best_proxy.id), "url": str(best_proxy.url), "score": round(score, 2), "reason": " ".join(reason_parts), "metrics": { "success_rate": best_proxy.success_rate, "avg_latency_ms": best_proxy.average_response_time_ms or 0.0, "region": best_proxy.country_code or "UNKNOWN", "performance_tier": performance, "total_requests": best_proxy.total_requests, }, "alternatives": alternatives, } } async def _get_health_impl(ctx: Any = None) -> dict[str, Any]: """Get health status of the proxy pool. Args: ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Health metrics and pool status. """ from datetime import datetime, timezone rotator = await get_rotator() # Get pool statistics stats = rotator.get_pool_stats() # Calculate average latency (thread-safe snapshot) proxies_snapshot = rotator.pool.get_all_proxies() latencies = [ p.average_response_time_ms for p in proxies_snapshot if p.average_response_time_ms is not None and p.average_response_time_ms > 0 ] avg_latency = sum(latencies) / len(latencies) if latencies else 0.0 # Determine overall pool status if stats["total_proxies"] == 0: pool_status = "empty" elif stats["healthy_proxies"] >= stats["total_proxies"] * 0.7: pool_status = "healthy" elif stats["healthy_proxies"] >= stats["total_proxies"] * 0.3: pool_status = "degraded" else: pool_status = "critical" # Check rate limiter if configured rate_limit_info = {"enabled": False, "message": "Rate limiting not configured"} if hasattr(rotator, "rate_limiter") and rotator.rate_limiter is not None: try: rate_limit_info = { "enabled": True, "max_requests": getattr(rotator.rate_limiter, "max_requests", None), "time_window_seconds": getattr(rotator.rate_limiter, "time_window", None), } except Exception: pass return { "pool_status": pool_status, "total_proxies": stats["total_proxies"], "healthy_proxies": stats["healthy_proxies"], "degraded_proxies": stats.get("unhealthy_proxies", 0), "failed_proxies": stats.get("dead_proxies", 0), "average_success_rate": stats["average_success_rate"], "average_latency_ms": round(avg_latency, 2), "last_update": datetime.now(timezone.utc).isoformat(), "total_requests": stats["total_requests"], "total_successes": stats["total_successes"], "total_failures": stats["total_failures"], "rate_limit": rate_limit_info, } async def _reset_circuit_breaker_impl(proxy_id: str, ctx: Any = None) -> dict[str, Any]: """Reset the circuit breaker for a specific proxy. Args: proxy_id: UUID of the proxy whose circuit breaker to reset ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Reset confirmation or error. """ if not proxy_id: return {"error": "proxy_id is required", "code": 400} rotator = await get_rotator() # Check if circuit breaker exists for this proxy circuit_breaker = rotator.circuit_breakers.get(proxy_id) if circuit_breaker is None: return {"error": f"No circuit breaker found for proxy: {proxy_id}", "code": 404} # Get state before reset for logging old_state = circuit_breaker.state.value # Reset the circuit breaker try: circuit_breaker.reset() logger.info( f"Circuit breaker reset for proxy {proxy_id}: {old_state} -> closed", proxy_id=proxy_id, old_state=old_state, ) return { "success": True, "proxy_id": proxy_id, "previous_state": old_state, "new_state": "closed", "message": f"Circuit breaker reset successfully for proxy {proxy_id}", } except Exception as e: logger.error(f"Failed to reset circuit breaker for {proxy_id}: {e}") return {"error": f"Failed to reset circuit breaker: {str(e)}", "code": 500} async def _add_proxy_impl(proxy_url: str, ctx: Any = None) -> dict[str, Any]: """Add a new proxy to the pool. Args: proxy_url: URL of the proxy to add (e.g., http://host:port) ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Added proxy info or error. """ from uuid import uuid4 from proxywhirl.models import Proxy, ProxySource if not proxy_url: return {"error": "proxy_url is required", "code": 400} # Parse and validate URL try: # Determine protocol from URL if proxy_url.startswith("socks5://"): protocol = "socks5" elif proxy_url.startswith("socks4://"): protocol = "socks4" elif proxy_url.startswith("https://"): protocol = "https" else: protocol = "http" if not proxy_url.startswith("http://"): proxy_url = f"http://{proxy_url}" proxy = Proxy( id=uuid4(), url=proxy_url, protocol=protocol, source=ProxySource.USER, health_status=HealthStatus.UNKNOWN, ) except Exception as e: return {"error": f"Invalid proxy URL: {str(e)}", "code": 400} rotator = await get_rotator() try: await rotator.add_proxy(proxy) return { "success": True, "proxy": { "id": str(proxy.id), "url": str(proxy.url), "protocol": proxy.protocol, "status": proxy.health_status.value, }, "pool_size": rotator.pool.size, } except Exception as e: logger.error(f"Failed to add proxy: {e}") return {"error": f"Failed to add proxy: {str(e)}", "code": 500} async def _remove_proxy_impl(proxy_id: str, ctx: Any = None) -> dict[str, Any]: """Remove a proxy from the pool. Args: proxy_id: UUID of the proxy to remove ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Removal confirmation or error. """ from uuid import UUID if not proxy_id: return {"error": "proxy_id is required", "code": 400} try: proxy_uuid = UUID(proxy_id) except ValueError: return {"error": f"Invalid proxy_id format: {proxy_id}", "code": 400} rotator = await get_rotator() # Find and remove the proxy proxy = None for p in rotator.pool.get_all_proxies(): if p.id == proxy_uuid: proxy = p break if proxy is None: return {"error": f"Proxy not found: {proxy_id}", "code": 404} try: rotator.pool.remove_proxy(proxy_uuid) # Also remove circuit breaker if exists if proxy_id in rotator.circuit_breakers: del rotator.circuit_breakers[proxy_id] return { "success": True, "proxy_id": proxy_id, "message": f"Proxy {proxy_id} removed successfully", "pool_size": rotator.pool.size, } except Exception as e: logger.error(f"Failed to remove proxy: {e}") return {"error": f"Failed to remove proxy: {str(e)}", "code": 500} async def _fetch_proxies_impl( sources: list[str] | None = None, max_proxies: int = 100, ctx: Any = None, ) -> dict[str, Any]: """Fetch proxies from public sources. Args: sources: List of source names or 'recommended' for default sources max_proxies: Maximum number of proxies to fetch ctx: FastMCP Context for logging/progress (optional) Returns: dict[str, Any]: Fetch results. """ rotator = await get_rotator() initial_size = rotator.pool.size try: # Report progress if context available if ctx is not None: try: await ctx.report_progress(0, max_proxies) except Exception: pass await _auto_fetch_proxies(rotator, max_proxies=max_proxies) fetched_count = rotator.pool.size - initial_size # Report completion if ctx is not None: try: await ctx.report_progress(fetched_count, max_proxies) except Exception: pass return { "success": True, "fetched": fetched_count, "pool_size": rotator.pool.size, "message": f"Fetched {fetched_count} proxies from public sources", } except Exception as e: logger.error(f"Failed to fetch proxies: {e}") return {"error": f"Failed to fetch proxies: {str(e)}", "code": 500} async def _validate_proxy_impl( proxy_id: str | None = None, timeout: float = 10.0, ctx: Any = None, ) -> dict[str, Any]: """Validate a specific proxy or all proxies. Args: proxy_id: UUID of proxy to validate, or None to validate all timeout: Timeout for validation requests ctx: FastMCP Context for logging/progress (optional) Returns: dict[str, Any]: Validation results. """ from uuid import UUID import httpx rotator = await get_rotator() # Determine which proxies to validate if proxy_id: try: proxy_uuid = UUID(proxy_id) except ValueError: return {"error": f"Invalid proxy_id format: {proxy_id}", "code": 400} proxy = None for p in rotator.pool.get_all_proxies(): if p.id == proxy_uuid: proxy = p break if proxy is None: return {"error": f"Proxy not found: {proxy_id}", "code": 404} proxies_to_validate = [proxy] else: proxies_to_validate = list(rotator.pool.get_all_proxies()) if not proxies_to_validate: return {"error": "No proxies to validate", "code": 404} # Validate proxies with progress reporting results = [] test_url = "https://httpbin.org/ip" total = len(proxies_to_validate) for i, proxy in enumerate(proxies_to_validate): # Report progress if context available if ctx is not None: try: await ctx.report_progress(i, total) except Exception: pass try: async with httpx.AsyncClient( proxy=str(proxy.url), timeout=timeout, verify=False, # nosec B501 - Intentional for proxy validation ) as client: start = asyncio.get_event_loop().time() resp = await client.get(test_url) latency = (asyncio.get_event_loop().time() - start) * 1000 is_valid = resp.status_code == 200 proxy.health_status = HealthStatus.HEALTHY if is_valid else HealthStatus.UNHEALTHY proxy.record_success(latency) if is_valid else proxy.record_failure() results.append( { "proxy_id": str(proxy.id), "url": str(proxy.url), "valid": is_valid, "latency_ms": round(latency, 2) if is_valid else None, "status_code": resp.status_code, } ) except Exception as e: proxy.health_status = HealthStatus.UNHEALTHY proxy.record_failure() results.append( { "proxy_id": str(proxy.id), "url": str(proxy.url), "valid": False, "error": str(e), } ) # Report completion if ctx is not None: try: await ctx.report_progress(total, total) except Exception: pass valid_count = sum(1 for r in results if r.get("valid")) return { "validated": len(results), "valid": valid_count, "invalid": len(results) - valid_count, "results": results if len(results) <= 10 else results[:10], # Limit response size "message": f"Validated {len(results)} proxies: {valid_count} valid, {len(results) - valid_count} invalid", } async def _set_strategy_impl(strategy: str, ctx: Any = None) -> dict[str, Any]: """Change the rotation strategy. Args: strategy: Strategy name (round-robin, random, weighted, least-used, performance-based, session-persistence, geo-targeted) ctx: FastMCP Context for logging (optional) Returns: dict[str, Any]: Strategy change confirmation. """ if not strategy: return {"error": "strategy is required", "code": 400} valid_strategies = [ "round-robin", "random", "weighted", "least-used", "performance-based", "session-persistence", "geo-targeted", "cost-aware", ] if strategy not in valid_strategies: return { "error": f"Invalid strategy: {strategy}. Valid: {', '.join(valid_strategies)}", "code": 400, } rotator = await get_rotator() old_strategy = rotator.strategy.__class__.__name__ try: rotator.set_strategy(strategy) new_strategy = rotator.strategy.__class__.__name__ return { "success": True, "previous_strategy": old_strategy, "new_strategy": new_strategy, "message": f"Strategy changed from {old_strategy} to {new_strategy}", } except Exception as e: logger.error(f"Failed to set strategy: {e}") return {"error": f"Failed to set strategy: {str(e)}", "code": 500} # ============================================================================ # Unified MCP Tool # ============================================================================ # Type alias for action parameter ProxywhirlAction = Literal[ "list", "rotate", "status", "recommend", "health", "reset_cb", "add", "remove", "fetch", "validate", "set_strategy", ] async def _proxywhirl_tool( action: ProxywhirlAction, proxy_id: str | None = None, proxy_url: str | None = None, strategy: str | None = None, criteria: dict[str, Any] | None = None, api_key: str | None = None, ctx: Any = None, ) -> dict[str, Any]: """Unified proxywhirl management tool for proxy pool operations. Args: action: Operation: list|rotate|status|recommend|health|reset_cb|add|remove|fetch|validate|set_strategy proxy_id: Proxy UUID (required for: status, reset_cb, remove; optional for: validate) proxy_url: Proxy URL e.g. http://host:port, socks5://host:port (required for: add) strategy: Strategy name: round-robin|random|weighted|least-used|performance-based (required for: set_strategy) criteria: Filter criteria dict. For recommend: {"region": "US", "performance": "high"}. For fetch: {"max_proxies": 100} api_key: API key for authentication (if server requires it) ctx: FastMCP v2 Context for logging/progress (injected by FastMCP) Returns: dict[str, Any]: Operation result. """ # Auth check for direct calls (middleware handles MCP calls) # Only check auth if ctx is None (direct call, not via MCP) if ctx is None: auth = get_auth() if not auth.authenticate({"api_key": api_key}): return {"error": "Authentication failed: Invalid API key", "code": 401} # Use context for logging if available, otherwise fall back to loguru def log_info(msg: str) -> None: if ctx is not None: asyncio.create_task(_async_log(ctx, "info", msg)) else: logger.info(msg) log_info(f"proxywhirl action={action}") if action == "list": return await _list_proxies_impl(ctx=ctx) elif action == "rotate": return await _rotate_proxy_impl(ctx=ctx) elif action == "status": if not proxy_id: return {"error": "proxy_id required for status action", "code": 400} return await _proxy_status_impl(proxy_id, ctx=ctx) elif action == "recommend": region = criteria.get("region") if criteria else None performance = criteria.get("performance", "medium") if criteria else "medium" return await _recommend_proxy_impl(region=region, performance=performance, ctx=ctx) elif action == "health": return await _get_health_impl(ctx=ctx) elif action == "reset_cb": if not proxy_id: return {"error": "proxy_id required for reset_cb action", "code": 400} return await _reset_circuit_breaker_impl(proxy_id, ctx=ctx) elif action == "add": if not proxy_url: return {"error": "proxy_url required for add action", "code": 400} return await _add_proxy_impl(proxy_url, ctx=ctx) elif action == "remove": if not proxy_id: return {"error": "proxy_id required for remove action", "code": 400} return await _remove_proxy_impl(proxy_id, ctx=ctx) elif action == "fetch": max_proxies = criteria.get("max_proxies", 100) if criteria else 100 return await _fetch_proxies_impl(max_proxies=max_proxies, ctx=ctx) elif action == "validate": timeout = criteria.get("timeout", 10.0) if criteria else 10.0 return await _validate_proxy_impl(proxy_id=proxy_id, timeout=timeout, ctx=ctx) elif action == "set_strategy": if not strategy: return {"error": "strategy required for set_strategy action", "code": 400} return await _set_strategy_impl(strategy, ctx=ctx) else: return {"error": f"Unknown action: {action}", "code": 400} async def _async_log(ctx: Any, level: str, msg: str) -> None: """Async helper for context logging.""" try: if level == "info": await ctx.info(msg) elif level == "debug": await ctx.debug(msg) elif level == "warning": await ctx.warning(msg) elif level == "error": await ctx.error(msg) except Exception: # Fall back to loguru if context logging fails getattr(logger, level)(msg) # Register unified tool with FastMCP when available # Use conditional to wrap with FastMCP decorator or provide standalone function for testing proxywhirl = mcp.tool()(_proxywhirl_tool) if mcp is not None else _proxywhirl_tool # ============================================================================ # MCP Resources # ============================================================================ async def _get_proxy_health_impl() -> str: """Get proxy pool health as JSON resource.""" import json health_data = await _get_health_impl() return json.dumps(health_data, indent=2) async def _get_proxy_config_impl() -> str: """Get proxy configuration as JSON resource.""" import json logger.info("Resource accessed: proxy://config") rotator = await get_rotator() # Get circuit breaker config from the first circuit breaker (they all share the same config) cb_config = {} if rotator.circuit_breakers: first_cb = next(iter(rotator.circuit_breakers.values())) cb_config = { "failure_threshold": first_cb.failure_threshold, "timeout_duration": first_cb.timeout_duration, "window_duration": first_cb.window_duration, } config_data = { "rotation_strategy": rotator.strategy.__class__.__name__, "timeout": rotator.config.timeout, "verify_ssl": rotator.config.verify_ssl, "follow_redirects": rotator.config.follow_redirects, "pool_connections": rotator.config.pool_connections, "pool_max_keepalive": rotator.config.pool_max_keepalive, "circuit_breaker": cb_config, "retry_policy": { "max_attempts": rotator.retry_policy.max_attempts, "backoff_strategy": rotator.retry_policy.backoff_strategy.value, "base_delay": rotator.retry_policy.base_delay, "max_backoff_delay": rotator.retry_policy.max_backoff_delay, "multiplier": rotator.retry_policy.multiplier, }, "logging": { "level": rotator.config.log_level, "format": rotator.config.log_format, "redact_credentials": rotator.config.log_redact_credentials, }, } return json.dumps(config_data, indent=2) # Register FastMCP resources when available if mcp is not None: proxy_health_resource = mcp.resource("resource://proxywhirl/health")(_get_proxy_health_impl) proxy_config_resource = mcp.resource("resource://proxywhirl/config")(_get_proxy_config_impl) # ============================================================================ # MCP Prompts # ============================================================================ async def _proxy_selection_workflow_prompt() -> str: """Workflow prompt for guiding proxy selection decisions.""" return """# Proxy Selection Workflow Use this workflow to select the optimal proxy for your request: ## Step 1: Assess Requirements - What is the target geography? (Use 'recommend' with region criteria) - What performance tier do you need? (high/medium/low) - Is this a one-time request or part of a session? ## Step 2: Check Pool Health - Call proxywhirl(action="health") to assess overall pool status - If pool_status is "critical" or "empty", consider fetching new proxies ## Step 3: Get Recommendation - Call proxywhirl(action="recommend", criteria={"region": "US", "performance": "high"}) - Review the recommendation and alternatives ## Step 4: Verify Selected Proxy - Call proxywhirl(action="status", proxy_id="<selected_id>") - Check circuit_breaker state (should be "closed") - Review success_rate and avg_latency_ms ## Step 5: Use or Rotate - If proxy looks good, use it for your request - If issues arise, call proxywhirl(action="rotate") for next proxy - If circuit breaker is open, call proxywhirl(action="reset_cb", proxy_id="<id>") ## Common Issues - All proxies failing: Check if target site is blocking proxies - High latency: Try "high" performance tier recommendation - Authentication errors: Verify proxy credentials """ async def _troubleshooting_workflow_prompt() -> str: """Workflow prompt for debugging proxy issues.""" return """# Proxy Troubleshooting Workflow Use this workflow when experiencing proxy issues: ## Step 1: Diagnose Pool Health ``` proxywhirl(action="health") ``` - Check pool_status: "healthy", "degraded", "critical", or "empty" - Review healthy_proxies vs total_proxies ratio - Check average_success_rate (should be > 0.8) ## Step 2: Check Specific Proxy ``` proxywhirl(action="status", proxy_id="<proxy_id>") ``` - Is circuit_breaker "open"? -> Proxy is failing repeatedly - Is consecutive_failures high? -> Proxy may be blocked - Is success_rate low? -> Proxy may be unreliable ## Step 3: Circuit Breaker Issues If a proxy's circuit breaker is open but you believe it should work: ``` proxywhirl(action="reset_cb", proxy_id="<proxy_id>") ``` This resets the circuit breaker to "closed" state for retry. ## Step 4: Find Alternative ``` proxywhirl(action="recommend", criteria={"performance": "high"}) ``` Get a fresh recommendation for a reliable proxy. ## Step 5: Rotate Away from Problem Proxy ``` proxywhirl(action="rotate") ``` Get the next available proxy in rotation. ## Common Patterns ### All Circuit Breakers Open - Indicates widespread blocking or network issues - Wait for timeout or reset individual breakers - Consider if target site is blocking all your proxies ### Low Success Rates Across Pool - Proxies may be stale (re-fetch from sources) - Target site may have changed blocking rules - Consider using different proxy sources ### Single Proxy Issues - Reset circuit breaker and retry - If persists, proxy may be permanently blocked - Rotate to different proxy """ # Register FastMCP prompts when available if mcp is not None: @mcp.prompt()
[docs] async def proxy_selection_workflow() -> str: """Workflow prompt for guiding proxy selection decisions.""" return await _proxy_selection_workflow_prompt()
@mcp.prompt() async def troubleshooting_workflow() -> str: """Workflow prompt for debugging proxy issues.""" return await _troubleshooting_workflow_prompt() # ============================================================================ # Public API Wrappers (for backward compatibility and testing) # ============================================================================
[docs] async def list_proxies(api_key: str | None = None) -> dict[str, Any]: """Wrapper to call shared list_proxies implementation. Args: api_key: Optional API key for authentication Returns: dict[str, Any]: Proxy list and pool statistics. """ # Auth check for direct API calls (not via MCP middleware) auth = get_auth() if not auth.authenticate({"api_key": api_key}): return {"error": "Authentication failed: Invalid API key", "code": 401} return await _list_proxies_impl()
[docs] async def rotate_proxy(api_key: str | None = None) -> dict[str, Any]: """Wrapper to call shared rotate_proxy implementation. Args: api_key: Optional API key for authentication Returns: dict[str, Any]: Selected proxy information. """ auth = get_auth() if not auth.authenticate({"api_key": api_key}): return {"error": "Authentication failed: Invalid API key", "code": 401} return await _rotate_proxy_impl()
[docs] async def proxy_status(proxy_id: str, api_key: str | None = None) -> dict[str, Any]: """Wrapper to call shared proxy_status implementation. Args: proxy_id: UUID of the proxy to check api_key: Optional API key for authentication Returns: dict[str, Any]: Proxy status and metrics. """ auth = get_auth() if not auth.authenticate({"api_key": api_key}): return {"error": "Authentication failed: Invalid API key", "code": 401} return await _proxy_status_impl(proxy_id)
[docs] async def recommend_proxy( region: str | None = None, performance: str | None = "medium", api_key: str | None = None, ) -> dict[str, Any]: """Wrapper to call shared recommend_proxy implementation. Args: region: Optional region filter (country code) performance: Performance tier (high, medium, low) api_key: Optional API key for authentication Returns: dict[str, Any]: Recommendation and alternatives. """ auth = get_auth() if not auth.authenticate({"api_key": api_key}): return {"error": "Authentication failed: Invalid API key", "code": 401} return await _recommend_proxy_impl(region=region, performance=performance)
[docs] async def get_proxy_health() -> str: """Wrapper to call shared get_proxy_health implementation.""" return await _get_proxy_health_impl()
[docs] async def get_proxy_config() -> str: """Wrapper to call shared get_proxy_config implementation.""" return await _get_proxy_config_impl()
[docs] async def get_rate_limit_status(proxy_id: str, api_key: str | None = None) -> dict[str, Any]: """Get rate limiting status for a specific proxy (backward compatibility wrapper). Note: Rate limiting info is now included in the health action response. This wrapper is kept for backward compatibility. Args: proxy_id: UUID of the proxy to check api_key: Optional API key for authentication Returns: dict[str, Any]: Rate limit information. """ from uuid import UUID logger.info(f"Tool called: get_rate_limit_status for {proxy_id}") # Authenticate request auth = get_auth() if not auth.authenticate({"api_key": api_key}): logger.warning("Authentication failed for get_rate_limit_status") return {"error": "Authentication failed: Invalid API key", "code": 401} rotator = await get_rotator() # Check if the proxy exists try: proxy_uuid = UUID(proxy_id) except ValueError: return {"error": f"Invalid proxy_id format: {proxy_id}", "code": 400} # Use thread-safe snapshot to find proxy proxy = None for p in rotator.pool.get_all_proxies(): if p.id == proxy_uuid: proxy = p break if proxy is None: return {"error": f"Proxy not found: {proxy_id}", "code": 404} # Check if rate limiter is configured if hasattr(rotator, "rate_limiter") and rotator.rate_limiter is not None: try: return { "proxy_id": proxy_id, "rate_limit": { "enabled": True, "message": "Rate limiting is configured", "max_requests": getattr(rotator.rate_limiter, "max_requests", None), "time_window_seconds": getattr(rotator.rate_limiter, "time_window", None), "current_usage": 0, # Would need actual tracking "remaining": None, }, } except Exception: pass # Rate limiting not configured (placeholder implementation) return { "proxy_id": proxy_id, "rate_limit": { "enabled": False, "message": "Rate limiting is not configured for this proxy", "max_requests": None, "time_window_seconds": None, "current_usage": 0, "remaining": None, }, "note": "Rate limiting can be configured via the RateLimiter class", }