"""Command-line interface for ProxyWhirl.
This module provides a Typer-based CLI for proxy rotation operations.
Supports multiple output formats (text, JSON, CSV) with TTY-aware rendering.
"""
from __future__ import annotations
import sys
from dataclasses import dataclass
from enum import Enum
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
import httpx
import typer
from rich.console import Console
from rich.table import Table
from proxywhirl.config import CLIConfig, discover_config, load_config
from proxywhirl.models import RequestResult
from proxywhirl.utils import CLILock, mask_proxy_url
# Typer app instance
app = typer.Typer(
name="proxywhirl",
help="Advanced proxy rotation library with CLI interface",
add_completion=True,
no_args_is_help=True,
)
@dataclass
[docs]
class CommandContext:
"""Shared context for all CLI commands.
Attributes:
config: CLI configuration loaded from discovered config file
config_path: Path to the active configuration file (may be fallback if none found)
format: Output format (text, json, csv)
verbose: Enable verbose logging
console: Rich console for formatted output
lock: File lock for concurrent operation safety
"""
[docs]
lock: CLILock | None = None
# Global context storage (set by callback)
_context: CommandContext | None = None
[docs]
def validate_target_url(url: str, allow_private: bool = False) -> None:
"""Validate a target URL to prevent SSRF attacks.
Args:
url: The URL to validate
allow_private: If True, allow private/internal IP addresses (default: False)
Raises:
typer.Exit: If the URL is invalid or potentially dangerous
"""
# Parse the URL
try:
parsed = urlparse(url)
except Exception as e:
typer.secho(f"Error: Invalid URL format: {e}", err=True, fg="red")
raise typer.Exit(code=1) from e
# Validate scheme (only http/https allowed for target URLs)
if parsed.scheme not in ("http", "https"):
typer.secho(
f"Error: Invalid URL scheme '{parsed.scheme}'. Only http:// and https:// are allowed.",
err=True,
fg="red",
)
typer.secho(
"Rejected schemes: file://, data://, gopher://, ftp://, etc.",
err=True,
fg="yellow",
)
raise typer.Exit(code=1)
# Validate hostname exists
if not parsed.hostname:
typer.secho("Error: URL must include a valid hostname", err=True, fg="red")
raise typer.Exit(code=1)
# Check for localhost/private addresses (SSRF protection)
if not allow_private:
hostname_lower = parsed.hostname.lower()
# Block localhost variations
localhost_patterns = [
"localhost",
"127.", # 127.0.0.1, etc.
"0.0.0.0", # nosec B104 - Pattern check, not a bind
"[::", # IPv6 localhost variations
"::1",
]
if any(hostname_lower.startswith(pattern) for pattern in localhost_patterns):
typer.secho(
f"Error: Access to localhost/loopback addresses is not allowed: {parsed.hostname}",
err=True,
fg="red",
)
typer.secho(
"Use --allow-private flag if you need to test against local services",
err=True,
fg="yellow",
)
raise typer.Exit(code=1)
# Block private IP ranges (RFC 1918)
private_ip_patterns = [
"10.", # 10.0.0.0/8
"172.16.",
"172.17.",
"172.18.",
"172.19.", # 172.16.0.0/12
"172.20.",
"172.21.",
"172.22.",
"172.23.",
"172.24.",
"172.25.",
"172.26.",
"172.27.",
"172.28.",
"172.29.",
"172.30.",
"172.31.",
"192.168.", # 192.168.0.0/16
"169.254.", # Link-local
]
if any(hostname_lower.startswith(pattern) for pattern in private_ip_patterns):
typer.secho(
f"Error: Access to private IP addresses is not allowed: {parsed.hostname}",
err=True,
fg="red",
)
typer.secho(
"Use --allow-private flag if you need to test against internal services",
err=True,
fg="yellow",
)
raise typer.Exit(code=1)
# Block internal domain names
internal_domains = [".local", ".internal", ".lan", ".corp"]
if any(hostname_lower.endswith(domain) for domain in internal_domains):
typer.secho(
f"Error: Access to internal domain names is not allowed: {parsed.hostname}",
err=True,
fg="red",
)
typer.secho(
"Use --allow-private flag if you need to test against internal services",
err=True,
fg="yellow",
)
raise typer.Exit(code=1)
[docs]
def get_context() -> CommandContext:
"""Get the current command context.
Returns:
CommandContext: The active command context
Raises:
typer.Exit: If context is not initialized
"""
if _context is None:
typer.secho("Error: Command context not initialized", err=True, fg="red")
raise typer.Exit(code=1)
return _context
@app.callback()
[docs]
def main(
ctx: typer.Context,
config_file: Path | None = typer.Option(
None,
"--config",
"-c",
help="Path to configuration file (TOML). Auto-discovered if not provided.",
exists=True,
dir_okay=False,
resolve_path=True,
),
format: OutputFormat = typer.Option(
OutputFormat.TEXT,
"--format",
"-f",
help="Output format (text/json/csv)",
case_sensitive=False,
),
verbose: bool = typer.Option(
False,
"--verbose",
"-v",
help="Enable verbose logging",
),
no_lock: bool = typer.Option(
False,
"--no-lock",
help="Disable file locking (use with caution)",
),
) -> None:
"""ProxyWhirl CLI - Advanced proxy rotation library.
Global options apply to all subcommands.
Configuration is auto-discovered from:
1. Project directory: ./.proxywhirl.toml
2. User directory: ~/.config/proxywhirl/config.toml (Linux/Mac)
3. Defaults: In-memory configuration
"""
global _context
# Discover config file if not provided
if config_file is None:
config_file = discover_config()
# Load configuration (defaults if no file found)
try:
config = load_config(config_file)
except Exception as e:
typer.secho(f"Error loading config: {e}", err=True, fg="red")
raise typer.Exit(code=1) from e
# Initialize console with TTY detection
console = Console(
force_terminal=format == OutputFormat.TEXT,
force_interactive=False,
force_jupyter=False,
)
# Initialize lock if enabled
lock = None
if not no_lock:
# Determine lock directory
if config_file:
lock_dir = config_file.parent
else:
from platformdirs import user_data_dir
lock_dir = Path(user_data_dir("proxywhirl", "proxywhirl"))
lock_dir.mkdir(parents=True, exist_ok=True)
# Acquire lock and ensure cleanup is atomic
lock = CLILock(config_dir=lock_dir)
try:
# Use context manager protocol for safe lock acquisition
lock.__enter__()
# Immediately register cleanup callback to ensure lock is ALWAYS released
# even if the command crashes or raises an exception
ctx.call_on_close(lambda: lock.__exit__(None, None, None))
except typer.Exit:
# Lock acquisition failed (already handled by CLILock)
raise
# Store context globally
_context = CommandContext(
config=config,
config_path=config_file or Path.cwd() / ".proxywhirl.toml", # Fallback path
format=format,
verbose=verbose,
console=console,
lock=lock,
)
[docs]
def render_text(console: Console, data: dict[str, Any]) -> None:
"""Render data as formatted text using Rich.
Args:
console: Rich console instance
data: Data to render (must have 'message' or table structure)
"""
if "message" in data:
console.print(data["message"])
elif "table" in data:
table = Table(title=data.get("title"))
for header in data["table"]["headers"]:
table.add_column(str(header))
for row in data["table"]["rows"]:
table.add_row(*[str(cell) for cell in row])
console.print(table)
else:
console.print(data)
[docs]
def render_json(data: dict[str, Any]) -> None:
"""Render data as JSON to stdout.
Args:
data: Data to render as JSON
"""
import json
print(json.dumps(data, indent=2))
[docs]
def render_csv(data: dict[str, Any]) -> None:
"""Render data as CSV to stdout.
Args:
data: Data to render (must have 'table' structure with headers/rows)
"""
import csv
if "table" not in data:
typer.secho("Error: CSV format requires table data", err=True, fg="red")
raise typer.Exit(code=1)
writer = csv.writer(sys.stdout)
writer.writerow(data["table"]["headers"])
writer.writerows(data["table"]["rows"])
[docs]
def render_output(context: CommandContext, data: dict[str, Any]) -> None:
"""Render output in the configured format.
Args:
context: Command context with format settings
data: Data to render
"""
if context.format == OutputFormat.TEXT:
render_text(context.console, data)
elif context.format == OutputFormat.JSON:
render_json(data)
elif context.format == OutputFormat.CSV:
render_csv(data)
# ============================================================================
# Commands
# ============================================================================
@app.command()
[docs]
def request(
url: str = typer.Argument(..., help="Target URL to request"),
method: str = typer.Option("GET", "--method", "-X", help="HTTP method (GET/POST/etc)"),
headers: list[str] = typer.Option(
None, "--header", "-H", help="Custom headers (format: 'Key: Value')"
),
data: str | None = typer.Option(None, "--data", "-d", help="Request body data"),
proxy: str | None = typer.Option(
None, "--proxy", "-p", help="Specific proxy URL (overrides rotation)"
),
max_retries: int | None = typer.Option(
None, "--retries", help="Max retry attempts (overrides config)"
),
allow_private: bool = typer.Option(
False,
"--allow-private",
help="Allow requests to localhost/private IPs (use with caution)",
),
) -> None:
"""Make an HTTP request through a rotating proxy.
Examples:
proxywhirl request https://api.example.com
proxywhirl request -X POST -d '{"key":"value"}' https://api.example.com
proxywhirl request -H "Authorization: Bearer token" https://api.example.com
proxywhirl request http://localhost:8080 --allow-private
"""
import time
ctx = get_context()
# Validate target URL to prevent SSRF attacks
validate_target_url(url, allow_private=allow_private)
# Parse headers
header_dict: dict[str, str] = {}
if headers:
for header in headers:
if ":" not in header:
typer.secho(f"Invalid header format: {header}", err=True, fg="red")
typer.secho("Use format: 'Key: Value'", err=True, fg="yellow")
raise typer.Exit(code=1)
key, value = header.split(":", 1)
header_dict[key.strip()] = value.strip()
# Get proxy to use
proxy_url: str | None = None
if proxy:
# Explicit proxy override - use it directly
proxy_url = proxy
elif ctx.config.proxies:
# Use rotator to select proxy based on configured strategy
from proxywhirl.models import Proxy
from proxywhirl.rotator import ProxyWhirl
# Convert config proxies to Proxy objects
proxies_list = [
Proxy(
url=p.url,
username=p.username,
password=p.password,
)
for p in ctx.config.proxies
]
# Create rotator with configured strategy
rotator = ProxyWhirl(proxies=proxies_list, strategy=ctx.config.rotation_strategy)
# Select proxy using rotation strategy
try:
selected_proxy = rotator.strategy.select(rotator.pool)
proxy_url = selected_proxy.url
except Exception as e:
if ctx.verbose:
typer.secho(f"Proxy selection failed: {e}", err=True, fg="yellow")
# Fallback to first proxy if selection fails
proxy_url = ctx.config.proxies[0].url
# Determine retries
retries = max_retries if max_retries is not None else ctx.config.max_retries
# Make request with retries
last_error: Exception | None = None
for attempt in range(retries + 1):
try:
start_time = time.time()
# Create httpx client with proxy
with httpx.Client(
proxy=proxy_url,
timeout=ctx.config.timeout,
follow_redirects=ctx.config.follow_redirects,
verify=ctx.config.verify_ssl,
) as client:
# Make request
response = client.request(method=method, url=url, headers=header_dict, content=data)
elapsed_ms = (time.time() - start_time) * 1000
# Create result
result = RequestResult(
url=url,
method=method,
status_code=response.status_code,
elapsed_ms=elapsed_ms,
proxy_used=proxy_url or "direct",
attempts=attempt + 1,
headers=dict(response.headers),
body=response.text[:1000], # Truncate to 1000 chars
)
# Render output based on format
if ctx.format == OutputFormat.TEXT:
# Human-readable output
status_color = "green" if result.is_success() else "red"
ctx.console.print(
f"[bold]Status:[/bold] [{status_color}]{result.status_code}[/{status_color}]"
)
ctx.console.print(f"[bold]URL:[/bold] {result.url}")
ctx.console.print(f"[bold]Time:[/bold] {result.elapsed_ms:.0f}ms")
if result.proxy_used and result.proxy_used != "direct":
# Mask credentials in proxy URL for safe display
masked_proxy = mask_proxy_url(result.proxy_used)
ctx.console.print(f"[bold]Proxy:[/bold] {masked_proxy}")
ctx.console.print(f"[bold]Attempts:[/bold] {result.attempts}")
if result.body:
ctx.console.print("\n[bold]Response:[/bold]")
ctx.console.print(result.body)
elif ctx.format == OutputFormat.JSON:
output: dict[str, Any] = {
"status_code": result.status_code,
"url": result.url,
"method": result.method,
"elapsed_ms": result.elapsed_ms,
"proxy_used": mask_proxy_url(result.proxy_used) if result.proxy_used else None,
"attempts": result.attempts,
"body": result.body,
}
render_json(output)
elif ctx.format == OutputFormat.CSV:
output_csv: dict[str, Any] = {
"table": {
"headers": ["Status", "URL", "Time(ms)", "Proxy", "Attempts"],
"rows": [
[
str(result.status_code),
result.url,
f"{result.elapsed_ms:.0f}",
mask_proxy_url(result.proxy_used) if result.proxy_used else "N/A",
str(result.attempts),
]
],
}
}
render_csv(output_csv)
# Exit successfully
return
except Exception as e:
last_error = e
if ctx.verbose:
typer.secho(
f"Attempt {attempt + 1}/{retries + 1} failed: {e}",
err=True,
fg="yellow",
)
if attempt < retries:
time.sleep(1) # Wait before retry
# All retries exhausted
typer.secho(f"Request failed after {retries + 1} attempts", err=True, fg="red")
if last_error:
typer.secho(f"Last error: {last_error}", err=True, fg="red")
raise typer.Exit(code=1)
@app.command()
[docs]
def pool(
action: str = typer.Argument(..., help="Action: list, add, remove, test"),
proxy: str | None = typer.Argument(None, help="Proxy URL (for add/remove/test actions)"),
username: str | None = typer.Option(None, "--username", "-u", help="Proxy username"),
password: str | None = typer.Option(None, "--password", "-p", help="Proxy password"),
target_url: str = typer.Option(
"https://httpbin.org/ip",
"--target-url",
help="Target URL for proxy testing (http/https only)",
),
allow_private: bool = typer.Option(
False,
"--allow-private",
help="Allow testing against localhost/private IPs (use with caution)",
),
) -> None:
"""Manage the proxy pool (list/add/remove/test).
Examples:
proxywhirl pool list
proxywhirl pool add http://proxy1.com:8080
proxywhirl pool remove http://proxy1.com:8080
proxywhirl pool test http://proxy1.com:8080
proxywhirl pool test http://proxy1.com:8080 --target-url https://api.example.com
"""
import time
import httpx
from pydantic import SecretStr
from proxywhirl.config import ProxyConfig, save_config
from proxywhirl.models import HealthStatus, PoolSummary, Proxy, ProxyStatus
from proxywhirl.rotator import ProxyWhirl
command_ctx = get_context()
# Validate action
valid_actions = ["list", "add", "remove", "test"]
if action not in valid_actions:
command_ctx.console.print(
f"[red]Invalid action '{action}'. Valid actions: {', '.join(valid_actions)}[/red]"
)
raise typer.Exit(code=1)
# Create rotator from config
proxies = []
for proxy_config in command_ctx.config.proxies:
proxies.append(
Proxy(
url=proxy_config.url,
username=proxy_config.username,
password=proxy_config.password,
)
)
rotator = ProxyWhirl(proxies=proxies, strategy=command_ctx.config.rotation_strategy)
if action == "list":
# List all proxies in pool (thread-safe snapshot)
pool_proxies = rotator.pool.get_all_proxies()
if not pool_proxies:
command_ctx.console.print("[yellow]No proxies in pool[/yellow]")
return
proxy_statuses = [
ProxyStatus(
url=p.url,
health=p.health_status,
response_time_ms=p.average_response_time_ms or 0,
success_rate=p.success_rate,
)
for p in pool_proxies
]
summary = PoolSummary(
total_proxies=len(pool_proxies),
healthy=sum(1 for p in pool_proxies if p.health_status == HealthStatus.HEALTHY),
degraded=sum(1 for p in pool_proxies if p.health_status == HealthStatus.DEGRADED),
failed=sum(
1
for p in pool_proxies
if p.health_status in (HealthStatus.UNHEALTHY, HealthStatus.DEAD)
),
rotation_strategy=command_ctx.config.rotation_strategy,
current_index=0, # Not tracked in rotator yet
proxies=proxy_statuses,
)
if command_ctx.format == OutputFormat.JSON:
render_json(summary.model_dump())
elif command_ctx.format == OutputFormat.CSV:
# For CSV, output each proxy as a row
import csv
import sys
writer = csv.DictWriter(
sys.stdout, fieldnames=["url", "health", "response_time_ms", "success_rate"]
)
writer.writeheader()
writer.writerows([ps.model_dump() for ps in proxy_statuses])
else: # TEXT
command_ctx.console.print("\n[bold]Proxy Pool Summary[/bold]")
command_ctx.console.print(f"Total Proxies: {summary.total_proxies}")
command_ctx.console.print(
f"Healthy: [green]{summary.healthy}[/green] | "
f"Degraded: [yellow]{summary.degraded}[/yellow] | "
f"Failed: [red]{summary.failed}[/red]"
)
command_ctx.console.print(f"Strategy: {summary.rotation_strategy}\n")
for ps in proxy_statuses:
health_color = (
"green"
if ps.health == HealthStatus.HEALTHY
else "yellow"
if ps.health == HealthStatus.DEGRADED
else "red"
)
# Mask proxy URL for safe display
masked_url = mask_proxy_url(ps.url)
command_ctx.console.print(
f" [{health_color}]●[/{health_color}] {masked_url} "
f"({ps.response_time_ms:.0f}ms, {ps.success_rate * 100:.0f}% success)"
)
elif action == "add":
if not proxy:
command_ctx.console.print("[red]Proxy URL required for 'add' action[/red]")
raise typer.Exit(code=1)
# Add proxy to pool
new_proxy = Proxy(
url=proxy,
username=SecretStr(username) if username else None,
password=SecretStr(password) if password else None,
)
rotator.pool.add_proxy(new_proxy)
# Save updated config
command_ctx.config.proxies.append(
ProxyConfig(
url=proxy,
username=SecretStr(username) if username else None,
password=SecretStr(password) if password else None,
)
)
save_config(command_ctx.config, command_ctx.config_path)
command_ctx.console.print(f"[green]✓[/green] Added proxy: {proxy}")
elif action == "remove":
if not proxy:
command_ctx.console.print("[red]Proxy URL required for 'remove' action[/red]")
raise typer.Exit(code=1)
# Find proxy by URL (thread-safe snapshot)
proxy_obj = next((p for p in rotator.pool.get_all_proxies() if p.url == proxy), None)
if not proxy_obj:
command_ctx.console.print(f"[red]Error:[/red] Proxy not found: {proxy}")
raise typer.Exit(code=1)
# Remove proxy from pool
rotator.pool.remove_proxy(proxy_obj.id)
# Save updated config
command_ctx.config.proxies = [p for p in command_ctx.config.proxies if p.url != proxy]
save_config(command_ctx.config, command_ctx.config_path)
command_ctx.console.print(f"[green]✓[/green] Removed proxy: {proxy}")
elif action == "test":
if not proxy:
command_ctx.console.print("[red]Proxy URL required for 'test' action[/red]")
raise typer.Exit(code=1)
# Validate target URL to prevent SSRF
validate_target_url(target_url, allow_private=allow_private)
# Test proxy with HTTP request
command_ctx.console.print(f"Testing proxy: {proxy}...")
command_ctx.console.print(f"Target URL: {target_url}")
try:
import time
start_time = time.time()
with httpx.Client(
proxy=proxy,
timeout=command_ctx.config.timeout,
verify=command_ctx.config.verify_ssl,
) as client:
response = client.get(target_url)
elapsed_ms = (time.time() - start_time) * 1000
if response.status_code == 200:
command_ctx.console.print(f"[green]✓[/green] Proxy is working ({elapsed_ms:.0f}ms)")
if command_ctx.verbose:
command_ctx.console.print(f"Response: {response.text}")
else:
command_ctx.console.print(
f"[yellow]![/yellow] Proxy returned status {response.status_code}"
)
except Exception as e:
command_ctx.console.print(f"[red]✗[/red] Proxy test failed: {e}")
raise typer.Exit(code=1) from e
@app.command()
[docs]
def config(
action: str = typer.Argument(..., help="Action: show, set, get, init"),
key: str | None = typer.Argument(None, help="Config key (for get/set)"),
value: str | None = typer.Argument(None, help="Config value (for set)"),
) -> None:
"""Manage CLI configuration (show/get/set/init).
Examples:
proxywhirl config show
proxywhirl config get rotation_strategy
proxywhirl config set rotation_strategy random
proxywhirl config init
"""
from pathlib import Path
from proxywhirl.config import CLIConfig, save_config
command_ctx = get_context()
# Validate action
valid_actions = ["show", "get", "set", "init"]
if action not in valid_actions:
command_ctx.console.print(
f"[red]Invalid action '{action}'. Valid actions: {', '.join(valid_actions)}[/red]"
)
raise typer.Exit(code=1)
if action == "init":
# Initialize a new config file
config_path = Path.cwd() / ".proxywhirl.toml"
if config_path.exists():
command_ctx.console.print(f"[yellow]Config file already exists: {config_path}[/yellow]")
if not typer.confirm("Overwrite?"):
raise typer.Exit(code=0)
# Create default config (uses CLIConfig field defaults;
# only override encrypt_credentials so users don't need
# PROXYWHIRL_KEY set up before first use)
default_config = CLIConfig(encrypt_credentials=False)
save_config(default_config, config_path)
command_ctx.console.print(f"[green]✓[/green] Created config file: {config_path}")
elif action == "show":
# Show entire config
if command_ctx.format == OutputFormat.JSON:
# Exclude sensitive fields
config_dict = command_ctx.config.model_dump(mode="json", exclude={"proxies"})
render_json(config_dict)
elif command_ctx.format == OutputFormat.CSV:
# CSV format - show as key-value pairs
import csv
import sys
config_dict = command_ctx.config.model_dump(mode="json", exclude={"proxies"})
writer = csv.writer(sys.stdout)
writer.writerow(["key", "value"])
for k, v in config_dict.items():
writer.writerow([k, v])
else: # TEXT
command_ctx.console.print("\n[bold]Configuration[/bold]")
command_ctx.console.print(f"Config file: {command_ctx.config_path}\n")
# Show non-sensitive settings
command_ctx.console.print(f"rotation_strategy: {command_ctx.config.rotation_strategy}")
command_ctx.console.print(
f"health_check_interval: {command_ctx.config.health_check_interval}s"
)
command_ctx.console.print(f"timeout: {command_ctx.config.timeout}s")
command_ctx.console.print(f"max_retries: {command_ctx.config.max_retries}")
command_ctx.console.print(f"follow_redirects: {command_ctx.config.follow_redirects}")
command_ctx.console.print(f"verify_ssl: {command_ctx.config.verify_ssl}")
command_ctx.console.print(f"default_format: {command_ctx.config.default_format}")
command_ctx.console.print(f"color: {command_ctx.config.color}")
command_ctx.console.print(f"verbose: {command_ctx.config.verbose}")
command_ctx.console.print(f"storage_backend: {command_ctx.config.storage_backend}")
command_ctx.console.print(f"storage_path: {command_ctx.config.storage_path}")
elif action == "get":
if not key:
command_ctx.console.print("[red]Key required for 'get' action[/red]")
raise typer.Exit(code=1)
# Get specific config value
if not hasattr(command_ctx.config, key):
command_ctx.console.print(f"[red]Unknown config key: {key}[/red]")
raise typer.Exit(code=1)
value_obj = getattr(command_ctx.config, key)
if command_ctx.format == OutputFormat.JSON:
render_json({key: value_obj})
else:
command_ctx.console.print(str(value_obj))
elif action == "set":
if not key or not value:
command_ctx.console.print("[red]Key and value required for 'set' action[/red]")
raise typer.Exit(code=1)
# Set config value
if not hasattr(command_ctx.config, key):
command_ctx.console.print(f"[red]Unknown config key: {key}[/red]")
raise typer.Exit(code=1)
# Convert value to appropriate type
old_value = getattr(command_ctx.config, key)
try:
new_value: bool | int | float | str
if isinstance(old_value, bool):
new_value = value.lower() in ("true", "1", "yes", "on")
elif isinstance(old_value, int):
new_value = int(value)
elif isinstance(old_value, float):
new_value = float(value)
else:
new_value = value
setattr(command_ctx.config, key, new_value)
save_config(command_ctx.config, command_ctx.config_path)
command_ctx.console.print(f"[green]✓[/green] Set {key} = {new_value}")
except ValueError as e:
command_ctx.console.print(f"[red]Invalid value for {key}: {e}[/red]")
raise typer.Exit(code=1) from e
@app.command()
[docs]
def export(
output: Path = typer.Option(
Path("docs/proxy-lists"),
"--output",
"-o",
help="Output directory for exported files",
),
db: Path = typer.Option(
Path("proxywhirl.db"),
"--db",
help="Path to SQLite database",
),
stats_only: bool = typer.Option(
False,
"--stats-only",
help="Only export statistics",
),
proxies_only: bool = typer.Option(
False,
"--proxies-only",
help="Only export proxy list",
),
) -> None:
"""Export proxy data and statistics for web dashboard.
Examples:
proxywhirl export
proxywhirl export --output ./exports
proxywhirl export --stats-only
proxywhirl export --proxies-only --db custom.db
"""
import asyncio
from proxywhirl.exports import export_for_web
command_ctx = get_context()
# Determine what to export
include_stats = not proxies_only
include_proxies = not stats_only
# Run async export
try:
outputs = asyncio.run(
export_for_web(
db_path=db,
output_dir=output,
include_stats=include_stats,
include_rich_proxies=include_proxies,
)
)
# Report results
if command_ctx.format == OutputFormat.JSON:
render_json({"exports": {k: str(v) for k, v in outputs.items()}})
else:
command_ctx.console.print(f"[green]✓[/green] Export completed to {output}")
for export_type, path in outputs.items():
command_ctx.console.print(f" {export_type}: {path}")
except Exception as e:
command_ctx.console.print(f"[red]Export failed:[/red] {e}")
raise typer.Exit(code=1) from e
def _parse_fetch_config(
no_validate: bool,
timeout: int,
concurrency: int,
) -> dict[str, Any]:
"""Parse and validate fetch configuration parameters.
Args:
no_validate: Skip proxy validation
timeout: Validation timeout in seconds
concurrency: Concurrent validation requests
Returns:
dict[str, Any]: Validated configuration parameters.
"""
return {
"validate": not no_validate,
"timeout": timeout,
"max_concurrent": concurrency,
}
async def _fetch_from_sources(
validate: bool,
timeout: int,
max_concurrent: int,
console: Console | None = None,
) -> list[Any]:
"""Fetch proxies from all configured sources with progress display.
Args:
validate: Whether to validate proxies
timeout: Validation timeout in seconds
max_concurrent: Maximum concurrent validation requests
console: Rich console for progress display
Returns:
List of fetched Proxy objects
Raises:
Exception: If fetching fails
"""
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)
from proxywhirl.sources import ALL_SOURCES, fetch_all_sources
total_sources = len(ALL_SOURCES)
proxies_found = 0
valid_count = 0
# Create progress display
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TaskProgressColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("{task.fields[status]}"),
console=console,
transient=False,
)
fetch_task = None
validate_task = None
def fetch_progress(completed: int, total: int, found: int) -> None:
"""Update fetch progress."""
nonlocal proxies_found, fetch_task
proxies_found = found
if fetch_task is not None:
progress.update(
fetch_task,
completed=completed,
status=f"[cyan]{proxies_found:,} proxies found[/cyan]",
)
def validate_progress(completed: int, total: int, valid: int) -> None:
"""Update validation progress."""
nonlocal valid_count, validate_task
valid_count = valid
if validate_task is not None:
pct = (valid / completed * 100) if completed > 0 else 0
progress.update(
validate_task,
completed=completed,
total=total, # Update total dynamically
status=f"[green]{valid:,} valid[/green] ({pct:.1f}%)",
)
with progress:
# Add fetch task
fetch_task = progress.add_task(
"Fetching sources",
total=total_sources,
status="[cyan]starting...[/cyan]",
)
# Add validation task (will be updated once fetching completes)
if validate:
validate_task = progress.add_task(
"Validating proxies",
total=0, # Will be set after fetch
status="[dim]waiting...[/dim]",
visible=True,
)
# Run fetch with callbacks
result = await fetch_all_sources(
validate=validate,
timeout=timeout,
max_concurrent=max_concurrent,
fetch_progress_callback=fetch_progress,
validate_progress_callback=validate_progress if validate else None,
)
# Mark tasks complete
progress.update(fetch_task, completed=total_sources)
if validate_task is not None:
progress.update(validate_task, completed=progress.tasks[validate_task].total)
return result
async def _save_results(proxies: list[Any], db_path: Path, validated: bool = True) -> None:
"""Save fetched proxies to database.
Args:
proxies: List of proxy dicts or Proxy objects to save
db_path: Path to SQLite database file
validated: If True, mark proxies as already validated (healthy status).
Default is True since fetch validates before saving.
"""
from proxywhirl.models import Proxy
from proxywhirl.storage import SQLiteStorage
storage = SQLiteStorage(db_path)
await storage.initialize()
try:
# Convert dicts to Proxy objects if needed
proxy_objects = []
for p in proxies:
if isinstance(p, dict):
proxy_objects.append(Proxy.model_validate(p))
else:
proxy_objects.append(p)
await storage.save(proxy_objects, validated=validated)
finally:
await storage.close()
async def _export_results(db_path: Path, output_dir: Path) -> None:
"""Export proxy data for web dashboard.
Args:
db_path: Path to SQLite database file
output_dir: Output directory for exported files
"""
from proxywhirl.exports import export_for_web
await export_for_web(
db_path=db_path,
output_dir=output_dir,
)
async def _revalidate_existing_proxies(
db_path: Path,
timeout: int,
max_concurrent: int,
prune_failed: bool,
console: Any,
) -> tuple[list[Any], int]:
"""Re-validate existing proxies in the database.
Loads all proxies from the database, validates them, and updates
their status using the normalized schema. Valid proxies get updated
with successful validation records. Failed proxies are marked as DEAD.
Args:
db_path: Path to SQLite database file
timeout: Validation timeout in seconds
max_concurrent: Maximum concurrent validation requests
prune_failed: If True, delete failed proxies. If False, mark them as DEAD.
console: Rich console for progress display
Returns:
Tuple of (list of valid proxy dicts, count of failed proxies)
"""
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)
from proxywhirl.fetchers import ProxyValidator
from proxywhirl.storage import SQLiteStorage
# Load existing proxies (returns dicts from normalized schema)
storage = SQLiteStorage(db_path)
await storage.initialize()
try:
existing_proxies = await storage.load()
total_proxies = len(existing_proxies)
if total_proxies == 0:
console.print("[yellow]No proxies in database to re-validate[/yellow]")
return [], 0
console.print(f"[cyan]Loaded {total_proxies:,} proxies from database[/cyan]")
# Build lookup map for original proxies by URL
original_by_url: dict[str, Any] = {p["url"]: p for p in existing_proxies}
# Convert to dicts for validator (already dicts, but ensure format)
proxy_dicts = []
for proxy in existing_proxies:
proxy_dict = {
"url": proxy["url"],
"protocol": proxy.get("protocol", "http"),
}
proxy_dicts.append(proxy_dict)
# Validate with progress display
valid_count = 0
completed = 0
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TaskProgressColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("{task.fields[status]}"),
console=console,
transient=False,
)
def progress_callback(done: int, total: int, valid: int) -> None:
nonlocal completed, valid_count
completed = done
valid_count = valid
pct = (valid / done * 100) if done > 0 else 0
progress.update(
task_id,
completed=done,
status=f"[green]{valid:,} valid[/green] ({pct:.1f}%)",
)
# Create validator with same settings as fetch
validator = ProxyValidator(
timeout=timeout,
concurrency=max_concurrent,
)
with progress:
task_id = progress.add_task(
"Re-validating proxies",
total=total_proxies,
status="[cyan]starting...[/cyan]",
)
try:
validated_dicts = await validator.validate_batch(
proxy_dicts,
progress_callback=progress_callback,
)
finally:
await validator.close()
# Build validation results for batch recording
valid_urls = {p["url"] for p in validated_dicts}
validation_results: list[tuple[str, bool, float | None, str | None]] = []
# Record successful validations
for p_dict in validated_dicts:
# Validator stores response time as "average_response_time_ms"
response_time = p_dict.get("average_response_time_ms")
validation_results.append((p_dict["url"], True, response_time, None))
# Record failed validations
failed_count = 0
for url in original_by_url:
if url not in valid_urls:
validation_results.append((url, False, None, "validation_failed"))
failed_count += 1
# Batch record all validation results
await storage.record_validations_batch(validation_results)
# If prune_failed, delete dead proxies
if prune_failed and failed_count > 0:
await storage.cleanup(
remove_dead=True,
remove_stale_days=0, # Don't remove stale
remove_never_validated=False,
vacuum=False,
)
console.print(f"[yellow]Deleted {failed_count} failed proxies[/yellow]")
else:
console.print(f"[yellow]Marked {failed_count} proxies as DEAD[/yellow]")
return validated_dicts, failed_count
finally:
await storage.close()
def _display_summary(
context: CommandContext,
proxies: list[Any],
no_validate: bool,
) -> None:
"""Display fetch summary to user.
Args:
context: Command context with format settings
proxies: List of fetched Proxy objects
no_validate: Whether validation was skipped
"""
if context.format == OutputFormat.JSON:
render_json(
{
"total": len(proxies),
"validated": not no_validate,
}
)
else:
context.console.print(f"[green]✓[/green] Fetched {len(proxies)} proxies")
@app.command()
[docs]
def fetch(
no_validate: bool = typer.Option(
False,
"--no-validate",
help="Skip proxy validation",
),
no_save_db: bool = typer.Option(
False,
"--no-save-db",
help="Don't save to database",
),
no_export: bool = typer.Option(
False,
"--no-export",
help="Don't export to files",
),
timeout: int = typer.Option(
10,
"--timeout",
help="Validation timeout in seconds",
),
concurrency: int = typer.Option(
100,
"--concurrency",
help="Concurrent validation requests",
),
revalidate: bool = typer.Option(
False,
"--revalidate",
"-R",
help="Re-validate existing proxies in database instead of fetching new ones",
),
prune_failed: bool = typer.Option(
False,
"--prune-failed",
help="Delete failed proxies instead of marking them as DEAD (use with --revalidate)",
),
https_validate: bool = typer.Option(
True,
"--https-validate/--no-https-validate",
help="After fetching, test valid HTTP proxies for HTTPS/CONNECT support and add them as https:// entries",
),
https_timeout: int = typer.Option(
8,
"--https-timeout",
help="Per-stage timeout in seconds for HTTPS CONNECT tests (default 8, higher = more found but slower)",
),
https_max: int = typer.Option(
2000,
"--https-max",
help="Maximum HTTPS-capable proxies to collect during dual-validation (0 = unlimited)",
),
) -> None:
"""Fetch proxies from configured sources.
Examples:
proxywhirl fetch
proxywhirl fetch --no-validate
proxywhirl fetch --timeout 5 --concurrency 50
proxywhirl fetch --revalidate --timeout 5 --concurrency 2000
proxywhirl fetch --revalidate --prune-failed
proxywhirl fetch --no-https-validate
proxywhirl fetch --https-timeout 10
"""
import asyncio
command_ctx = get_context()
# Parse configuration
fetch_config = _parse_fetch_config(no_validate, timeout, concurrency)
if revalidate:
# Re-validate existing proxies in database
command_ctx.console.print("[bold]Re-validating existing proxies in database...[/bold]")
try:
valid_proxies, failed_count = asyncio.run(
_revalidate_existing_proxies(
db_path=Path("proxywhirl.db"),
timeout=fetch_config["timeout"],
max_concurrent=fetch_config["max_concurrent"],
prune_failed=prune_failed,
console=command_ctx.console,
)
)
command_ctx.console.print(
f"[green]✓[/green] Re-validation complete: "
f"[green]{len(valid_proxies):,} valid[/green], "
f"[red]{failed_count:,} failed[/red]"
)
proxies = valid_proxies
except Exception as e:
command_ctx.console.print(f"[red]Re-validation failed:[/red] {e}")
raise typer.Exit(code=1) from e
else:
# Normal mode: fetch from sources
try:
proxies = asyncio.run(
_fetch_from_sources(
validate=fetch_config["validate"],
timeout=fetch_config["timeout"],
max_concurrent=fetch_config["max_concurrent"],
console=command_ctx.console,
)
)
except Exception as e:
command_ctx.console.print(f"[red]Fetch failed:[/red] {e}")
raise typer.Exit(code=1) from e
# Dual-validate HTTP proxies for HTTPS/CONNECT support
if https_validate and fetch_config["validate"] and proxies:
http_only = [p for p in proxies if (p.get("protocol") or "http") == "http"]
if http_only:
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)
try:
from proxywhirl.fetchers import ProxyValidator
validator = ProxyValidator(
timeout=https_timeout,
)
https_progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TaskProgressColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
TextColumn("{task.fields[status]}"),
console=command_ctx.console,
transient=False,
)
https_task_id = None
def https_progress_callback(done: int, total: int, valid: int) -> None:
nonlocal https_task_id
if https_task_id is not None:
pct = (valid / done * 100) if done > 0 else 0
https_progress.update(
https_task_id,
completed=done,
status=f"[green]{valid:,} HTTPS[/green] ({pct:.1f}%)",
)
with https_progress:
https_task_id = https_progress.add_task(
"HTTPS CONNECT test",
total=len(http_only),
status="[cyan]starting...[/cyan]",
)
https_capable = asyncio.run(
validator.validate_https_capability_batch(
http_only,
concurrency=min(fetch_config["max_concurrent"], 500),
max_results=https_max if https_max > 0 else None,
progress_callback=https_progress_callback,
)
)
if https_capable:
proxies = list(proxies) + https_capable
command_ctx.console.print(
f"[green]✓[/green] Found [green]{len(https_capable):,}[/green] HTTPS-capable proxies"
)
else:
command_ctx.console.print(
"[dim]No HTTPS-capable proxies found in this batch[/dim]"
)
except Exception as e:
command_ctx.console.print(
f"[yellow]HTTPS dual-validation skipped:[/yellow] {e}"
)
# Display summary
_display_summary(command_ctx, proxies, no_validate)
# Save to database if requested
if not no_save_db:
command_ctx.console.print("[bold]Saving to database...[/bold]")
try:
asyncio.run(_save_results(proxies, Path("proxywhirl.db")))
command_ctx.console.print("[green]✓[/green] Saved to database")
except Exception as e:
command_ctx.console.print(f"[red]Save failed:[/red] {e}")
raise typer.Exit(code=1) from e
# Export if requested
if not no_export:
command_ctx.console.print("[bold]Exporting...[/bold]")
try:
asyncio.run(
_export_results(
db_path=Path("proxywhirl.db"),
output_dir=Path("docs/proxy-lists"),
)
)
command_ctx.console.print("[green]✓[/green] Exported to docs/proxy-lists")
except Exception as e:
command_ctx.console.print(f"[red]Export failed:[/red] {e}")
raise typer.Exit(code=1) from e
@app.command()
[docs]
def stats(
retry: bool = typer.Option(
False,
"--retry",
help="Show retry metrics",
),
circuit_breaker: bool = typer.Option(
False,
"--circuit-breaker",
help="Show circuit breaker events",
),
hours: int = typer.Option(
24,
"--hours",
"-r",
help="Time window in hours",
),
) -> None:
"""Show proxy pool statistics.
Examples:
proxywhirl stats
proxywhirl stats --retry
proxywhirl stats --circuit-breaker
proxywhirl stats --hours 12
"""
from datetime import datetime, timedelta, timezone
from proxywhirl.retry import RetryMetrics
command_ctx = get_context()
# If no flags specified, show both
show_retry = retry or (not retry and not circuit_breaker)
show_circuit = circuit_breaker or (not retry and not circuit_breaker)
# Load metrics (would typically come from storage)
metrics = RetryMetrics()
# Prepare output data
output_data: dict[str, Any] = {}
if show_retry:
summary = metrics.get_summary()
output_data["retry"] = {
"total_requests": summary.get("total_requests", 0),
"total_retries": summary.get("total_retries", 0),
"success_rate": summary.get("success_rate", 0.0),
}
if show_circuit:
# Get circuit breaker events (filtering by hours if needed)
cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
events = [e for e in metrics.circuit_breaker_events if e.timestamp >= cutoff]
output_data["circuit_breaker"] = {
"total_events": len(events),
"events": [
{
"proxy_id": e.proxy_id,
"from_state": e.from_state.value,
"to_state": e.to_state.value,
"timestamp": e.timestamp.isoformat(),
}
for e in events[:10] # Limit to 10 most recent
],
}
# Render output
if command_ctx.format == OutputFormat.JSON:
render_json(output_data)
elif command_ctx.format == OutputFormat.CSV:
# For CSV, flatten to table format
if show_retry and "retry" in output_data:
data = output_data["retry"]
render_csv(
{
"table": {
"headers": ["metric", "value"],
"rows": [[k, str(v)] for k, v in data.items()],
}
}
)
elif show_circuit and "circuit_breaker" in output_data:
events = output_data["circuit_breaker"]["events"]
if events:
render_csv(
{
"table": {
"headers": ["proxy_id", "from_state", "to_state", "timestamp"],
"rows": [
[e["proxy_id"], e["from_state"], e["to_state"], e["timestamp"]]
for e in events
],
}
}
)
else: # TEXT
command_ctx.console.print("\n[bold]Proxy Pool Statistics[/bold]\n")
if show_retry and "retry" in output_data:
command_ctx.console.print("[bold]Retry Metrics[/bold]")
retry_data = output_data["retry"]
command_ctx.console.print(f" Total Requests: {retry_data['total_requests']}")
command_ctx.console.print(f" Total Retries: {retry_data['total_retries']}")
command_ctx.console.print(f" Success Rate: {retry_data['success_rate']:.1%}\n")
if show_circuit and "circuit_breaker" in output_data:
command_ctx.console.print("[bold]Circuit Breaker Events[/bold]")
cb_data = output_data["circuit_breaker"]
command_ctx.console.print(f" Total Events: {cb_data['total_events']}")
if cb_data["events"]:
command_ctx.console.print("\n Recent Events:")
for event in cb_data["events"]:
command_ctx.console.print(
f" {event['proxy_id']}: {event['from_state']} → {event['to_state']}"
)
else:
command_ctx.console.print(f" No events in the last {hours} hours")
@app.command(name="setup-geoip")
[docs]
def setup_geoip(
check: bool = typer.Option(
False,
"--check",
help="Check if GeoIP database is available",
),
) -> None:
"""Setup GeoIP database for proxy geolocation.
Examples:
proxywhirl setup-geoip
proxywhirl setup-geoip --check
"""
from proxywhirl.enrichment import is_geoip_available
command_ctx = get_context()
if check:
# Check if database is available
available = is_geoip_available()
if command_ctx.format == OutputFormat.JSON:
render_json({"available": available})
else:
if available:
command_ctx.console.print("[green]✓[/green] GeoIP database is available")
else:
command_ctx.console.print("[yellow]![/yellow] GeoIP database not found")
else:
# Show installation instructions
if command_ctx.format == OutputFormat.JSON:
render_json(
{
"instructions": "Download MaxMind GeoLite2 database",
"url": "https://dev.maxmind.com/geoip/geolite2-free-geolocation-data",
}
)
else:
command_ctx.console.print("\n[bold]GeoIP Database Setup[/bold]\n")
command_ctx.console.print(
"ProxyWhirl uses MaxMind GeoLite2 for offline IP geolocation.\n"
)
command_ctx.console.print("[bold]Instructions:[/bold]")
command_ctx.console.print(" 1. Sign up for a free MaxMind account:")
command_ctx.console.print(
" https://dev.maxmind.com/geoip/geolite2-free-geolocation-data"
)
command_ctx.console.print("\n 2. Download GeoLite2-City.mmdb")
command_ctx.console.print("\n 3. Place the database file in one of these locations:")
command_ctx.console.print(" - ./GeoLite2-City.mmdb")
command_ctx.console.print(" - ~/.local/share/proxywhirl/GeoLite2-City.mmdb")
command_ctx.console.print(" - /usr/share/GeoIP/GeoLite2-City.mmdb")
command_ctx.console.print("\n 4. Run 'proxywhirl setup-geoip --check' to verify")
@app.command()
[docs]
def health(
continuous: bool = typer.Option(False, "--continuous", "-C", help="Run continuously"),
interval: int | None = typer.Option(None, "--interval", "-i", help="Check interval in seconds"),
target_url: str | None = typer.Option(
None, "--target-url", "-t", help="URL to test proxy connectivity against (http/https only)"
),
allow_private: bool = typer.Option(
False,
"--allow-private",
help="Allow testing against localhost/private IPs (use with caution)",
),
) -> None:
"""Check health of all proxies in the pool.
Examples:
proxywhirl health
proxywhirl health --continuous --interval 60
proxywhirl health -C -i 60
proxywhirl health --target-url https://api.example.com
proxywhirl health --target-url http://localhost:8080 --allow-private
"""
import time
from datetime import datetime, timezone
import httpx
from proxywhirl.models import HealthStatus, PoolSummary, Proxy, ProxyStatus
from proxywhirl.rotator import ProxyWhirl
command_ctx = get_context()
# Validate target URL if provided
if target_url:
validate_target_url(target_url, allow_private=allow_private)
# Determine the URL to test against
test_url = target_url if target_url else "https://httpbin.org/ip"
# Create rotator from config
proxies = []
for proxy_config in command_ctx.config.proxies:
proxies.append(
Proxy(
url=proxy_config.url,
username=proxy_config.username,
password=proxy_config.password,
)
)
if not proxies:
command_ctx.console.print("[yellow]No proxies configured[/yellow]")
command_ctx.console.print("Add proxies using: proxywhirl pool add <URL>")
raise typer.Exit(code=0)
rotator = ProxyWhirl(proxies=proxies, strategy=command_ctx.config.rotation_strategy)
check_interval = interval if interval is not None else command_ctx.config.health_check_interval
def check_health() -> list[ProxyStatus]:
"""Check health of all proxies."""
results = []
# Use thread-safe snapshot
for proxy in rotator.pool.get_all_proxies():
try:
start_time = time.time()
with httpx.Client(
proxy=proxy.url,
timeout=command_ctx.config.timeout,
verify=command_ctx.config.verify_ssl,
) as client:
response = client.get(test_url)
elapsed_ms = (time.time() - start_time) * 1000
is_healthy = response.status_code == 200
# Update proxy health
proxy.health_status = HealthStatus.HEALTHY if is_healthy else HealthStatus.DEGRADED
if is_healthy:
proxy.last_success_at = datetime.now(timezone.utc)
proxy.total_successes += 1
proxy.consecutive_failures = 0
else:
proxy.last_failure_at = datetime.now(timezone.utc)
proxy.total_failures += 1
proxy.consecutive_failures += 1
proxy.total_requests += 1
proxy.average_response_time_ms = elapsed_ms
results.append(
ProxyStatus(
url=proxy.url,
health=proxy.health_status,
response_time_ms=elapsed_ms,
success_rate=proxy.success_rate,
)
)
if command_ctx.verbose:
command_ctx.console.print(
f"✓ {proxy.url}: {response.status_code} ({elapsed_ms:.0f}ms)"
)
except Exception as e:
# Mark as unhealthy
proxy.health_status = HealthStatus.UNHEALTHY
proxy.last_failure_at = datetime.now(timezone.utc)
proxy.total_failures += 1
proxy.total_requests += 1
proxy.consecutive_failures += 1
results.append(
ProxyStatus(
url=proxy.url,
health=HealthStatus.UNHEALTHY,
response_time_ms=0,
success_rate=proxy.success_rate,
)
)
if command_ctx.verbose:
command_ctx.console.print(f"✗ {proxy.url}: {e}")
return results
# Single check mode
if not continuous:
command_ctx.console.print("\n[bold]Checking proxy health...[/bold]\n")
results = check_health()
# Display results
summary = PoolSummary(
total_proxies=len(results),
healthy=sum(1 for r in results if r.health == HealthStatus.HEALTHY),
degraded=sum(1 for r in results if r.health == HealthStatus.DEGRADED),
failed=sum(
1 for r in results if r.health in (HealthStatus.UNHEALTHY, HealthStatus.DEAD)
),
rotation_strategy=command_ctx.config.rotation_strategy,
current_index=0,
proxies=results,
)
if command_ctx.format == OutputFormat.JSON:
render_json(summary.model_dump())
elif command_ctx.format == OutputFormat.CSV:
import csv
import sys
writer = csv.DictWriter(
sys.stdout, fieldnames=["url", "health", "response_time_ms", "success_rate"]
)
writer.writeheader()
writer.writerows([r.model_dump() for r in results])
else: # TEXT
command_ctx.console.print("[bold]Health Check Results[/bold]")
command_ctx.console.print(
f"Healthy: [green]{summary.healthy}[/green] | "
f"Degraded: [yellow]{summary.degraded}[/yellow] | "
f"Failed: [red]{summary.failed}[/red]\n"
)
for r in results:
health_color = (
"green"
if r.health == HealthStatus.HEALTHY
else "yellow"
if r.health == HealthStatus.DEGRADED
else "red"
)
command_ctx.console.print(
f" [{health_color}]●[/{health_color}] {r.url} "
f"({r.response_time_ms:.0f}ms, {r.success_rate * 100:.0f}% success)"
)
# Continuous monitoring mode
else:
command_ctx.console.print(
f"\n[bold]Continuous health monitoring (interval: {check_interval}s)[/bold]"
)
command_ctx.console.print("Press Ctrl+C to stop\n")
try:
iteration = 0
while True:
iteration += 1
command_ctx.console.print(f"\n[dim]Check #{iteration}[/dim]")
results = check_health()
# Display brief summary
healthy = sum(1 for r in results if r.health == HealthStatus.HEALTHY)
degraded = sum(1 for r in results if r.health == HealthStatus.DEGRADED)
failed = sum(
1 for r in results if r.health in (HealthStatus.UNHEALTHY, HealthStatus.DEAD)
)
command_ctx.console.print(
f"Status: [green]{healthy} healthy[/green] | "
f"[yellow]{degraded} degraded[/yellow] | "
f"[red]{failed} failed[/red]"
)
time.sleep(check_interval)
except KeyboardInterrupt:
command_ctx.console.print("\n[yellow]Monitoring stopped[/yellow]")
raise typer.Exit(code=0)
# Create a command group for sources
sources_app = typer.Typer(
name="sources",
help="Manage and audit proxy sources",
no_args_is_help=False, # Allow running without args for backward compatibility
)
app.add_typer(sources_app, name="sources")
@sources_app.callback(invoke_without_command=True)
[docs]
def sources_callback(
ctx: typer.Context,
validate: bool = typer.Option(
False,
"--validate",
"-v",
help="Validate all sources and check for stale/broken ones",
),
timeout: float = typer.Option(
15.0,
"--timeout",
"-t",
help="Timeout per source in seconds",
),
concurrency: int = typer.Option(
20,
"--concurrency",
"-j",
help="Maximum concurrent requests",
),
fail_on_unhealthy: bool = typer.Option(
False,
"--fail-on-unhealthy",
"-f",
help="Exit with error code if any sources are unhealthy (for CI)",
),
) -> None:
"""List and validate proxy sources.
By default, lists all configured proxy sources with their URLs.
Use --validate to check which sources are working and which are stale.
Examples:
proxywhirl sources # List all sources
proxywhirl sources --validate # Validate all sources
proxywhirl sources -v -f # Validate and fail if any unhealthy (CI mode)
proxywhirl sources audit # Full audit with detailed results
proxywhirl sources audit --fix # Audit and remove broken sources
"""
# Only run if no subcommand is invoked
if ctx.invoked_subcommand is not None:
return
import asyncio
from rich.table import Table
from proxywhirl.sources import (
ALL_HTTP_SOURCES,
ALL_SOCKS4_SOURCES,
ALL_SOCKS5_SOURCES,
ALL_SOURCES,
)
from proxywhirl.sources import (
validate_sources as validate_sources_async,
)
command_ctx = get_context()
if validate:
# Validation mode
enabled_sources = [s for s in ALL_SOURCES if s.enabled]
command_ctx.console.print(
f"[bold]Validating {len(enabled_sources)} proxy sources...[/bold]\n"
)
report = asyncio.run(validate_sources_async(timeout=timeout, concurrency=concurrency))
# Create results table
table = Table(title="Source Validation Results")
table.add_column("Status", style="bold", width=8)
table.add_column("Source", style="cyan")
table.add_column("Response", justify="right")
table.add_column("Size", justify="right")
table.add_column("Time", justify="right")
table.add_column("Error")
# Sort: unhealthy first, then by name
sorted_results = sorted(report.results, key=lambda r: (r.is_healthy, r.name.lower()))
for result in sorted_results:
if result.is_healthy:
status = "[green]✓ OK[/green]"
error = ""
else:
status = "[red]✗ FAIL[/red]"
error = result.error or (
f"HTTP {result.status_code}" if result.status_code else "No response"
)
size = f"{result.content_length:,}" if result.content_length else "-"
time_str = f"{result.response_time_ms:.0f}ms"
resp = str(result.status_code) if result.status_code else "-"
table.add_row(status, result.name, resp, size, time_str, error[:30] if error else "")
command_ctx.console.print(table)
# Summary
command_ctx.console.print("\n[bold]Summary:[/bold]")
command_ctx.console.print(
f" Total: {report.total_sources} | "
f"[green]Healthy: {report.healthy_sources}[/green] | "
f"[red]Unhealthy: {report.unhealthy_sources}[/red] | "
f"Time: {report.total_time_ms:.0f}ms"
)
if report.unhealthy:
command_ctx.console.print("\n[bold red]Unhealthy sources:[/bold red]")
for result in report.unhealthy:
error_msg = result.error or (
f"HTTP {result.status_code}" if result.status_code else "No response"
)
command_ctx.console.print(f" • {result.name}: {error_msg}")
if fail_on_unhealthy:
command_ctx.console.print(
f"\n[red]Exiting with error: {report.unhealthy_sources} unhealthy source(s)[/red]"
)
raise typer.Exit(code=1)
else:
command_ctx.console.print("\n[green]All sources are healthy![/green]")
else:
# List mode
command_ctx.console.print("[bold]Configured Proxy Sources[/bold]\n")
command_ctx.console.print(f"[cyan]HTTP Sources ({len(ALL_HTTP_SOURCES)}):[/cyan]")
for src in ALL_HTTP_SOURCES:
url_display = str(src.url)[:80]
command_ctx.console.print(f" • {url_display}{'...' if len(str(src.url)) > 80 else ''}")
command_ctx.console.print(f"\n[cyan]SOCKS4 Sources ({len(ALL_SOCKS4_SOURCES)}):[/cyan]")
for src in ALL_SOCKS4_SOURCES:
url_display = str(src.url)[:80]
command_ctx.console.print(f" • {url_display}{'...' if len(str(src.url)) > 80 else ''}")
command_ctx.console.print(f"\n[cyan]SOCKS5 Sources ({len(ALL_SOCKS5_SOURCES)}):[/cyan]")
for src in ALL_SOCKS5_SOURCES:
url_display = str(src.url)[:80]
command_ctx.console.print(f" • {url_display}{'...' if len(str(src.url)) > 80 else ''}")
command_ctx.console.print(f"\n[bold]Total: {len(ALL_SOURCES)} sources[/bold]")
command_ctx.console.print("\n[dim]Use --validate to check source health[/dim]")
command_ctx.console.print("[dim]Use 'proxywhirl sources audit' for detailed auditing[/dim]")
@sources_app.command()
[docs]
def audit(
timeout: float = typer.Option(
15.0,
"--timeout",
"-t",
help="Timeout per source in seconds",
),
concurrency: int = typer.Option(
20,
"--concurrency",
"-j",
help="Maximum concurrent requests",
),
retries: int = typer.Option(
3,
"--retries",
"-r",
help="Number of retries for each source before marking as broken",
),
fix: bool = typer.Option(
False,
"--fix",
help="Remove broken sources from sources.py (creates backup)",
),
dry_run: bool = typer.Option(
False,
"--dry-run",
"-n",
help="Show what would be removed without making changes (implies --fix)",
),
min_proxies: int = typer.Option(
1,
"--min-proxies",
help="Minimum proxies required for a source to be considered healthy",
),
protocol: str | None = typer.Option(
None,
"--protocol",
"-p",
help="Only audit sources of specific protocol (http, socks4, socks5)",
),
) -> None:
"""Audit proxy sources for broken or stale entries.
Tests each source by fetching from it and checking if it returns valid proxies.
A source is considered "broken" if:
- It returns a non-200 status code
- It times out after retries
- It returns 0 proxies (or less than --min-proxies)
- It returns malformed/unparseable content
Use --fix to automatically remove broken sources from sources.py.
A backup file will be created before any modifications.
Examples:
proxywhirl sources audit # Audit all sources
proxywhirl sources audit --protocol http # Only HTTP sources
proxywhirl sources audit --fix # Remove broken sources
proxywhirl sources audit --dry-run # Preview what would be removed
proxywhirl sources audit --retries 5 # More retries before marking broken
proxywhirl sources audit -j 50 -t 30 # Higher concurrency, longer timeout
"""
import asyncio
from rich.table import Table
from proxywhirl.sources import (
ALL_HTTP_SOURCES,
ALL_SOCKS4_SOURCES,
ALL_SOCKS5_SOURCES,
ALL_SOURCES,
)
command_ctx = get_context()
# Select sources based on protocol filter
if protocol:
protocol_lower = protocol.lower()
if protocol_lower == "http":
sources_to_audit = [s for s in ALL_HTTP_SOURCES if s.enabled]
protocol_name = "HTTP"
elif protocol_lower == "socks4":
sources_to_audit = [s for s in ALL_SOCKS4_SOURCES if s.enabled]
protocol_name = "SOCKS4"
elif protocol_lower == "socks5":
sources_to_audit = [s for s in ALL_SOCKS5_SOURCES if s.enabled]
protocol_name = "SOCKS5"
else:
command_ctx.console.print(
f"[red]Invalid protocol: {protocol}. Use http, socks4, or socks5[/red]"
)
raise typer.Exit(code=1)
command_ctx.console.print(
f"[bold]Auditing {len(sources_to_audit)} {protocol_name} sources...[/bold]\n"
)
else:
sources_to_audit = [s for s in ALL_SOURCES if s.enabled]
command_ctx.console.print(
f"[bold]Auditing {len(sources_to_audit)} proxy sources...[/bold]\n"
)
# Run audit
audit_results = asyncio.run(
_run_source_audit(
sources=sources_to_audit,
timeout=timeout,
concurrency=concurrency,
retries=retries,
min_proxies=min_proxies,
console=command_ctx.console,
)
)
# Separate working and broken sources
working = [r for r in audit_results if r["status"] == "healthy"]
broken = [r for r in audit_results if r["status"] == "broken"]
# Create results table
table = Table(title="Source Audit Results")
table.add_column("Status", style="bold", width=10)
table.add_column("Source", style="cyan")
table.add_column("Proxies", justify="right")
table.add_column("Time", justify="right")
table.add_column("Reason")
# Sort: broken first, then by name
sorted_results = sorted(
audit_results, key=lambda r: (r["status"] == "healthy", r["name"].lower())
)
for result in sorted_results:
if result["status"] == "healthy":
status = "[green]✓ HEALTHY[/green]"
reason = ""
else:
status = "[red]✗ BROKEN[/red]"
reason = result.get("error", "Unknown error")[:40]
proxies = str(result.get("proxy_count", 0))
time_str = f"{result.get('response_time_ms', 0):.0f}ms"
table.add_row(status, result["name"], proxies, time_str, reason)
command_ctx.console.print(table)
# Summary
command_ctx.console.print("\n[bold]Audit Summary:[/bold]")
command_ctx.console.print(
f" Total: {len(audit_results)} | "
f"[green]Healthy: {len(working)}[/green] | "
f"[red]Broken: {len(broken)}[/red]"
)
# Output JSON format for CI
if command_ctx.format == OutputFormat.JSON:
render_json(
{
"total_sources": len(audit_results),
"healthy_sources": len(working),
"broken_sources": len(broken),
"broken_urls": [r["url"] for r in broken],
"results": audit_results,
}
)
# Handle fix mode
if broken and (fix or dry_run):
command_ctx.console.print("\n[bold]Broken sources to remove:[/bold]")
for result in broken:
command_ctx.console.print(f" • {result['name']}")
command_ctx.console.print(f" URL: {result['url']}")
command_ctx.console.print(f" Reason: {result.get('error', 'Unknown')}")
if dry_run:
command_ctx.console.print(
f"\n[yellow]Dry run: Would remove {len(broken)} source(s)[/yellow]"
)
command_ctx.console.print("[dim]Run without --dry-run to apply changes[/dim]")
else:
# Actually remove broken sources
removed_count = _remove_broken_sources(
broken_urls=[r["url"] for r in broken],
console=command_ctx.console,
)
if removed_count > 0:
command_ctx.console.print(
f"\n[green]✓ Removed {removed_count} broken source(s)[/green]"
)
command_ctx.console.print(
"[yellow]Note: Backup created at proxywhirl/sources.py.backup[/yellow]"
)
else:
command_ctx.console.print(
"\n[yellow]No sources were removed (check logs for details)[/yellow]"
)
elif broken:
command_ctx.console.print(
f"\n[yellow]Found {len(broken)} broken source(s). Use --fix to remove them.[/yellow]"
)
# Exit with error if broken sources found (for CI)
if broken:
raise typer.Exit(code=1)
async def _run_source_audit(
sources: list[Any],
timeout: float,
concurrency: int,
retries: int,
min_proxies: int,
console: Console,
) -> list[dict[str, Any]]:
"""Run source audit with retries and proxy counting.
Args:
sources: List of ProxySourceConfig to audit
timeout: Timeout per request in seconds
concurrency: Maximum concurrent requests
retries: Number of retries per source
min_proxies: Minimum proxies for healthy status
console: Rich console for progress display
Returns:
List of audit result dicts with status, proxy_count, etc.
"""
import asyncio
import time
import httpx
from rich.progress import (
BarColumn,
MofNCompleteColumn,
Progress,
SpinnerColumn,
TaskProgressColumn,
TextColumn,
TimeElapsedColumn,
)
from proxywhirl.fetchers import ProxyFetcher
from proxywhirl.sources import _get_source_name
results: list[dict[str, Any]] = []
semaphore = asyncio.Semaphore(concurrency)
completed = 0
async def audit_source(source: Any) -> dict[str, Any]:
"""Audit a single source with retries."""
nonlocal completed
name = _get_source_name(source)
url = str(source.url)
for attempt in range(retries):
start_time = time.perf_counter()
try:
async with semaphore:
# Create fetcher for single source
fetcher = ProxyFetcher(sources=[source])
try:
# Fetch without validation (we just want to count)
proxies = await fetcher.fetch_all(validate=False, deduplicate=True)
elapsed_ms = (time.perf_counter() - start_time) * 1000
proxy_count = len(proxies)
if proxy_count >= min_proxies:
completed += 1
return {
"name": name,
"url": url,
"status": "healthy",
"proxy_count": proxy_count,
"response_time_ms": elapsed_ms,
"attempts": attempt + 1,
}
else:
# Not enough proxies, try again
if attempt < retries - 1:
await asyncio.sleep(1) # Brief pause before retry
continue
completed += 1
return {
"name": name,
"url": url,
"status": "broken",
"proxy_count": proxy_count,
"response_time_ms": elapsed_ms,
"error": f"Only {proxy_count} proxies (min: {min_proxies})",
"attempts": attempt + 1,
}
finally:
await fetcher.close()
except httpx.TimeoutException:
if attempt < retries - 1:
await asyncio.sleep(1)
continue
completed += 1
return {
"name": name,
"url": url,
"status": "broken",
"proxy_count": 0,
"response_time_ms": (time.perf_counter() - start_time) * 1000,
"error": "Timeout",
"attempts": attempt + 1,
}
except Exception as e:
if attempt < retries - 1:
await asyncio.sleep(1)
continue
completed += 1
return {
"name": name,
"url": url,
"status": "broken",
"proxy_count": 0,
"response_time_ms": (time.perf_counter() - start_time) * 1000,
"error": str(e)[:100],
"attempts": attempt + 1,
}
# Should not reach here, but handle it
completed += 1
return {
"name": name,
"url": url,
"status": "broken",
"proxy_count": 0,
"response_time_ms": 0,
"error": "Max retries exceeded",
"attempts": retries,
}
# Run with progress bar
progress = Progress(
SpinnerColumn(),
TextColumn("[bold blue]{task.description}"),
BarColumn(),
TaskProgressColumn(),
MofNCompleteColumn(),
TextColumn("•"),
TimeElapsedColumn(),
console=console,
transient=False,
)
with progress:
task = progress.add_task("Auditing sources", total=len(sources))
async def audit_with_progress(source: Any) -> dict[str, Any]:
result = await audit_source(source)
progress.update(task, advance=1)
return result
tasks = [audit_with_progress(src) for src in sources]
results = await asyncio.gather(*tasks)
return results
def _remove_broken_sources(broken_urls: list[str], console: Console) -> int:
"""Remove broken sources from sources.py file.
Creates a backup before modification. Uses regex to comment out
the broken source definitions.
Args:
broken_urls: List of URLs to remove
console: Rich console for output
Returns:
Number of sources removed
"""
import re
import shutil
sources_path = Path(__file__).parent / "sources.py"
backup_path = sources_path.with_suffix(".py.backup")
if not sources_path.exists():
console.print("[red]Error: sources.py not found[/red]")
return 0
# Create backup
shutil.copy(sources_path, backup_path)
# Read current content
content = sources_path.read_text()
removed_count = 0
# For each broken URL, comment out its ProxySourceConfig definition
for url in broken_urls:
# Escape URL for regex
escaped_url = re.escape(url)
# Pattern to match ProxySourceConfig with this URL
# Matches: VARIABLE_NAME = ProxySourceConfig(...url="<url>"...)
# This is a simplified pattern - in practice, source configs span multiple lines
pattern = (
rf'^([A-Z_]+)\s*=\s*ProxySourceConfig\(\s*\n?\s*url=["\']?{escaped_url}["\']?.*?\)$'
)
match = re.search(pattern, content, re.MULTILINE | re.DOTALL)
if match:
var_name = match.group(1)
# Comment out the entire definition
commented = f"# REMOVED BY AUDIT: {match.group(0)}"
content = content.replace(match.group(0), commented)
removed_count += 1
console.print(f" [dim]Commented out: {var_name}[/dim]")
# If we made changes, also remove from the collection lists
if removed_count > 0:
# Write modified content
sources_path.write_text(content)
return removed_count
@app.command()
[docs]
def tui() -> None:
"""Launch the interactive Terminal User Interface (TUI).
The TUI provides a full-featured dashboard for managing proxies with:
- Real-time metrics and sparkline visualizations
- Proxy table with filtering, sorting, and health status
- Manual proxy management (add/remove)
- Health checks with progress bars
- Circuit breaker monitoring
- Request testing with multiple HTTP methods
- Export functionality with format preview
Keyboard shortcuts:
j/k - Navigate up/down
g/G - Jump to first/last
Enter - View proxy details
c - Copy proxy URL
t - Quick test proxy
/ - Focus search
? - Show help modal
Ctrl+A - Toggle auto-refresh
Ctrl+R - Refresh all data
Ctrl+F - Fetch tab
Ctrl+E - Export tab
Ctrl+T - Test tab
Ctrl+H - Health tab
Examples:
proxywhirl tui
"""
from proxywhirl.tui import run_tui
# Run the TUI (bypasses normal output formatting)
run_tui()
@app.command()
[docs]
def db_stats(
db: Path = typer.Option(
Path("proxywhirl.db"),
"--db",
help="Path to SQLite database",
),
) -> None:
"""Show database statistics.
Displays comprehensive statistics about the proxy database including
counts by health status, protocol, and validation metrics.
Examples:
proxywhirl db-stats
proxywhirl db-stats --db custom.db
"""
import asyncio
from rich.table import Table
command_ctx = get_context()
async def get_db_stats():
from proxywhirl.storage import SQLiteStorage
storage = SQLiteStorage(db)
await storage.initialize()
try:
return await storage.get_stats()
finally:
await storage.close()
try:
stats = asyncio.run(get_db_stats())
except Exception as e:
command_ctx.console.print(f"[red]Error loading stats:[/red] {e}")
raise typer.Exit(code=1) from e
if command_ctx.format == OutputFormat.JSON:
render_json(stats)
else:
table = Table(title="Proxy Database Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green", justify="right")
table.add_row("Total Proxies", f"{stats.get('total_proxies', 0):,}")
table.add_row("", "")
# Health status breakdown
table.add_row("[bold]By Health Status[/bold]", "")
for status, count in sorted(stats.get("by_health", {}).items()):
color = "green" if status == "healthy" else "yellow" if status == "unknown" else "red"
table.add_row(f" {status}", f"[{color}]{count:,}[/{color}]")
table.add_row("", "")
# Protocol breakdown
table.add_row("[bold]By Protocol[/bold]", "")
for protocol, count in sorted(stats.get("by_protocol", {}).items()):
table.add_row(f" {protocol}", f"{count:,}")
# Validation stats (normalized only)
if "validations_24h" in stats:
table.add_row("", "")
table.add_row("[bold]Validations (24h)[/bold]", "")
v = stats["validations_24h"]
table.add_row(" Total", f"{v.get('total', 0):,}")
table.add_row(" Successful", f"{v.get('successful', 0):,}")
if v.get("avg_response_time_ms"):
table.add_row(" Avg Response Time", f"{v['avg_response_time_ms']:.0f}ms")
# Database size
if "db_size_bytes" in stats:
size_mb = stats["db_size_bytes"] / (1024 * 1024)
table.add_row("", "")
table.add_row("Database Size", f"{size_mb:.2f} MB")
command_ctx.console.print(table)
@app.command()
[docs]
def cleanup(
db: Path = typer.Option(
Path("proxywhirl.db"),
"--db",
help="Path to SQLite database",
),
stale_days: int = typer.Option(
7,
"--stale-days",
help="Remove proxies not validated in N days",
),
execute: bool = typer.Option(
False,
"--execute",
help="Actually perform cleanup (dry run by default)",
),
) -> None:
"""Clean up stale and dead proxies from the database.
By default, performs a dry run showing what would be removed.
Use --execute to actually perform the cleanup.
Examples:
proxywhirl cleanup # Dry run
proxywhirl cleanup --execute # Actually remove
proxywhirl cleanup --stale-days 14 # Remove if not validated in 14 days
"""
import asyncio
command_ctx = get_context()
async def run_cleanup():
from proxywhirl.storage import SQLiteStorage
storage = SQLiteStorage(db)
await storage.initialize()
try:
if execute:
result = await storage.cleanup(
remove_dead=True,
remove_stale_days=stale_days,
remove_never_validated=True,
vacuum=True,
)
return {"executed": True, "counts": result}
else:
# Dry run - just get stats
stats = await storage.get_stats()
return {
"executed": False,
"would_remove": {
"dead": stats.get("by_health", {}).get("dead", 0),
},
}
finally:
await storage.close()
try:
result = asyncio.run(run_cleanup())
except Exception as e:
command_ctx.console.print(f"[red]Cleanup failed:[/red] {e}")
raise typer.Exit(code=1) from e
if command_ctx.format == OutputFormat.JSON:
render_json(result)
else:
if result["executed"]:
counts = result["counts"]
total = sum(counts.values())
command_ctx.console.print(
f"[green]✓[/green] Cleanup completed: removed {total:,} proxies"
)
for category, count in counts.items():
command_ctx.console.print(f" {category}: {count:,}")
else:
would_remove = result.get("would_remove", {})
command_ctx.console.print("[yellow]Dry run - showing what would be removed:[/yellow]")
for category, count in would_remove.items():
command_ctx.console.print(f" {category}: {count:,}")
command_ctx.console.print("\n[dim]Use --execute to actually remove[/dim]")
if __name__ == "__main__":
app()