Source code for proxywhirl.fetchers

"""
Proxy fetching and parsing functionality.

This module provides tools for fetching proxies from various sources and
parsing different formats (JSON, CSV, plain text, HTML tables).
"""

from __future__ import annotations

import asyncio
import csv
import json
import re
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, NamedTuple

if TYPE_CHECKING:
    from proxywhirl.models import ValidationLevel
from urllib.parse import urlparse

import httpx
from bs4 import BeautifulSoup
from loguru import logger
from tenacity import (
    RetryCallState,
    retry,
    retry_if_exception,
    retry_if_exception_type,
    stop_after_attempt,
)

from proxywhirl.exceptions import ProxyFetchError, ProxyValidationError
from proxywhirl.models import ProxySourceConfig, RenderMode

# Check for httpx-socks availability (optional dependency for SOCKS proxy support)
try:
    from httpx_socks import AsyncProxyTransport

    SOCKS_AVAILABLE = True
except ImportError:
    AsyncProxyTransport = None  # type: ignore[misc, assignment]
    SOCKS_AVAILABLE = False
    logger.debug(
        "httpx-socks not installed. SOCKS4/SOCKS5 proxy validation will not work. "
        "Install with: uv sync or add httpx-socks to your dependencies"
    )


[docs] class JSONParser: """Parse JSON-formatted proxy lists.""" def __init__( self, key: str | None = None, required_fields: list[str] | None = None, ) -> None: """ Initialize JSON parser. Args: key: Optional key to extract from JSON object required_fields: Fields that must be present in each proxy """ self.key = key self.required_fields = required_fields or []
[docs] def parse(self, data: str) -> list[dict[str, Any]]: """ Parse JSON proxy data. Args: data: JSON string to parse Returns: List of proxy dictionaries Raises: ProxyFetchError: If JSON is invalid ProxyValidationError: If required fields are missing """ try: parsed = json.loads(data) except json.JSONDecodeError as e: raise ProxyFetchError(f"Invalid JSON: {e}") from e # Extract from key if specified if self.key: if not isinstance(parsed, dict) or self.key not in parsed: raise ProxyFetchError(f"JSON missing key: {self.key}") parsed = parsed[self.key] # Ensure we have a list if not isinstance(parsed, list): raise ProxyFetchError("JSON must be array or object with array key") # Validate required fields if self.required_fields: for proxy in parsed: for field in self.required_fields: if field not in proxy: raise ProxyValidationError(f"Missing required field: {field}") return parsed
[docs] class CSVParser: """Parse CSV-formatted proxy lists.""" def __init__( self, has_header: bool = True, columns: list[str] | None = None, skip_invalid: bool = False, ) -> None: """ Initialize CSV parser. Args: has_header: Whether CSV has header row columns: Column names if no header skip_invalid: Skip malformed rows instead of raising error """ self.has_header = has_header self.columns = columns self.skip_invalid = skip_invalid
[docs] def parse(self, data: str) -> list[dict[str, Any]]: """ Parse CSV proxy data. Args: data: CSV string to parse Returns: List of proxy dictionaries Raises: ProxyFetchError: If CSV is malformed and skip_invalid is False """ if not data.strip(): return [] lines = data.strip().split("\n") reader = csv.reader(lines) proxies = [] if self.has_header: # First row is headers try: headers = next(reader) except StopIteration: return [] for row in reader: if len(row) != len(headers): if self.skip_invalid: continue raise ProxyFetchError( f"Malformed CSV row: expected {len(headers)} columns, got {len(row)}" ) proxies.append(dict(zip(headers, row))) else: # Use provided column names if not self.columns: raise ProxyFetchError("Must provide columns if no header") for row in reader: if len(row) != len(self.columns): if self.skip_invalid: continue raise ProxyFetchError( f"Malformed CSV row: expected {len(self.columns)} columns, got {len(row)}" ) proxies.append(dict(zip(self.columns, row))) return proxies
[docs] class PlainTextParser: """Parse plain text proxy lists (one per line).""" # Pre-compiled regex pattern for IP:PORT format with anchors to prevent ReDoS # Matches: 1.2.3.4:8080 or 1.2.3.4:8080:Country Name (trailing suffix ignored) # Groups: (1) IP address, (2) port number _IP_PORT_PATTERN = re.compile(r"^(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}):(\d{1,5})(?::.+)?$") def __init__(self, skip_invalid: bool = True) -> None: """ Initialize plain text parser. Args: skip_invalid: Skip invalid URLs instead of raising error """ self.skip_invalid = skip_invalid def _is_valid_ip_port(self, ip_str: str, port_str: str) -> bool: """Validate IP address octets (0-255) and port range (1-65535). Args: ip_str: IP address string (e.g., "192.168.1.1") port_str: Port number string (e.g., "8080") Returns: True if IP and port are valid, False otherwise """ # Validate each IP octet is 0-255 octets = ip_str.split(".") if len(octets) != 4: return False for octet in octets: try: value = int(octet) if value < 0 or value > 255: return False # Reject leading zeros (e.g., "01", "001") except for "0" itself if octet != str(value): return False except ValueError: return False # Validate port is 1-65535 try: port = int(port_str) if port < 1 or port > 65535: return False except ValueError: return False return True
[docs] def parse(self, data: str) -> list[dict[str, Any]]: """ Parse plain text proxy data. Args: data: Plain text string with one proxy per line Supports formats: IP:PORT, http://IP:PORT, socks5://IP:PORT Returns: List of proxy dictionaries with 'url' key Raises: ProxyFetchError: If invalid proxy format is encountered and skip_invalid=False """ proxies = [] for line in data.split("\n"): line = line.strip() # Skip empty lines if not line: continue # Skip comments if line.startswith("#"): continue # Handle IP:PORT format (prepend http://) # Using pre-compiled pattern with anchors for ReDoS safety match = self._IP_PORT_PATTERN.match(line) if match: ip_str, port_str = match.groups() # Validate IP octet range (0-255) and port range (1-65535) if not self._is_valid_ip_port(ip_str, port_str): if self.skip_invalid: continue raise ProxyFetchError(f"Invalid IP:PORT format: {line}") line = f"http://{ip_str}:{port_str}" # Validate URL format try: parsed = urlparse(line) if not parsed.scheme or not parsed.netloc: if not self.skip_invalid: raise ProxyFetchError(f"Invalid URL: {line}") continue except Exception: if self.skip_invalid: continue raise proxies.append({"url": line}) return proxies
[docs] class HTMLTableParser: """Parse HTML table-formatted proxy lists.""" def __init__( self, table_selector: str = "table", column_map: dict[str, str] | None = None, column_indices: dict[str, int] | None = None, ) -> None: """ Initialize HTML table parser. Args: table_selector: CSS selector for table element column_map: Map header names to proxy fields column_indices: Map field names to column indices """ self.table_selector = table_selector self.column_map = column_map or {} self.column_indices = column_indices or {}
[docs] def parse(self, data: str) -> list[dict[str, Any]]: """ Parse HTML table proxy data. Args: data: HTML string containing table Returns: List of proxy dictionaries """ soup = BeautifulSoup(data, "html.parser") table = soup.select_one(self.table_selector) if not table: return [] proxies = [] rows = table.find_all("tr") # Determine if first row is header has_header = bool(rows and rows[0].find_all("th")) data_rows = rows[1:] if has_header else rows # Build header map if using column_map header_to_index: dict[str, int] = {} if has_header and self.column_map: headers = [th.get_text(strip=True) for th in rows[0].find_all("th")] for i, header in enumerate(headers): header_to_index[header] = i for row in data_rows: cells = row.find_all("td") proxy: dict[str, Any] = {} if self.column_indices: # Use column indices for field, index in self.column_indices.items(): if index < len(cells): proxy[field] = cells[index].get_text(strip=True) elif self.column_map and header_to_index: # Use column map with headers for header, field in self.column_map.items(): if header in header_to_index: index = header_to_index[header] if index < len(cells): proxy[field] = cells[index].get_text(strip=True) if proxy: # Only add if we extracted any data proxies.append(proxy) return proxies
[docs] class GeonodeParser: """Parse GeoNode API JSON response format. GeoNode API returns: {"data": [{"ip": "...", "port": "...", "protocols": ["http"]}, ...]} This parser extracts and transforms to standard format: {"url": "http://ip:port", "protocol": "http"} """
[docs] def parse(self, data: str) -> list[dict[str, Any]]: """Parse GeoNode API response. Args: data: JSON string from GeoNode API Returns: List of proxy dictionaries in standard format """ try: parsed = json.loads(data) except json.JSONDecodeError as e: raise ProxyFetchError(f"Invalid JSON from GeoNode: {e}") from e # Extract data array if not isinstance(parsed, dict) or "data" not in parsed: raise ProxyFetchError("GeoNode response missing 'data' key") items = parsed["data"] if not isinstance(items, list): raise ProxyFetchError("GeoNode 'data' is not an array") proxies = [] for item in items: ip = item.get("ip") port = item.get("port") protocols = item.get("protocols", []) if not ip or not port: continue # GeoNode may return port as string or int port_str = str(port) # Create proxy entry for each protocol for protocol in protocols: protocol_lower = protocol.lower() url = f"{protocol_lower}://{ip}:{port_str}" proxies.append({"url": url, "protocol": protocol_lower}) return proxies
[docs] def deduplicate_proxies(proxies: list[dict[str, Any]]) -> list[dict[str, Any]]: """ Deduplicate proxies by URL+Port combination. Hostnames are normalized to lowercase since DNS names are case-insensitive (RFC 4343). This ensures that "PROXY.EXAMPLE.COM:8080" and "proxy.example.com:8080" are correctly identified as duplicates. Handles edge cases including: - IPv6 addresses: [2001:db8::1]:8080 (preserved as-is, already case-insensitive) - IDN hostnames: Прокси.рф:8080 (lowercased correctly) - Mixed-case DNS names: Proxy.EXAMPLE.com:8080 (lowercased for comparison) Args: proxies: List of proxy dictionaries Returns: Deduplicated list (keeps first occurrence) """ seen: set[str] = set() unique: list[dict[str, Any]] = [] for proxy in proxies: # Extract URL+port as key url = proxy.get("url", "") if url: parsed = urlparse(url) # Normalize netloc to lowercase (RFC 4343: DNS names are case-insensitive) # netloc includes host:port (or [ipv6]:port) # IPv6 addresses: [2001:db8::1]:8080 -> lowercases to same (hex already lowercase) # IDN domains: Прокси.рф:8080 -> прокси.рф:8080 key = parsed.netloc.lower() else: # Construct from host+port host = proxy.get("host", "") port = proxy.get("port", "") if not host or not port: # Skip proxies with incomplete host+port continue # Normalize hostname to lowercase (handles IDN, IPv6, and DNS names) key = f"{host.lower()}:{port}" if key not in seen: seen.add(key) unique.append(proxy) return unique
[docs] class ValidationResult(NamedTuple): """Result of proxy validation with timing metrics.""" is_valid: bool response_time_ms: float | None # None if validation failed before timing def __bool__(self) -> bool: """Allow ValidationResult to be used in boolean context.""" return self.is_valid
[docs] class ProxyValidator: """Validate proxy connectivity with metrics collection.""" # Multiple test URLs to avoid single-point bottleneck and rate limiting TEST_URLS = [ "http://www.gstatic.com/generate_204", # Google - fast, no rate limit "http://cp.cloudflare.com/generate_204", # Cloudflare - very fast "http://connectivitycheck.android.com/generate_204", # Android check "http://www.msftconnecttest.com/connecttest.txt", # Microsoft ] # HTTPS-specific test URLs for validating HTTPS proxy connectivity. # Multiple endpoints for fallback — succeed on any. HTTPS_TEST_URLS = [ "https://www.gstatic.com/generate_204", # Google HTTPS (204) "https://cp.cloudflare.com/generate_204", # Cloudflare HTTPS (204) "https://connectivity-check.ubuntu.com/", # Ubuntu (204) "https://detectportal.firefox.com/canonical.html", # Firefox (200) ] def __init__( self, timeout: float = 5.0, test_url: str | None = None, level: ValidationLevel | None = None, concurrency: int = 50, ) -> None: """ Initialize proxy validator. Args: timeout: Connection timeout in seconds test_url: URL to use for connectivity testing. If None, rotates between multiple fast endpoints (Google, Cloudflare, etc.) level: Validation level (BASIC, STANDARD, FULL). Defaults to STANDARD. concurrency: Maximum number of concurrent validations """ from proxywhirl.models import ValidationLevel self.timeout = timeout self._custom_test_url = test_url # None means rotate self._test_url_index = 0 self._https_test_url_index = 0 self.level = level or ValidationLevel.STANDARD self.concurrency = concurrency self._client: httpx.AsyncClient | None = None self._socks_client: httpx.AsyncClient | None = None @property
[docs] def test_url(self) -> str: """Get current test URL, rotating through multiple endpoints.""" if self._custom_test_url: return self._custom_test_url # Rotate through test URLs to distribute load url = self.TEST_URLS[self._test_url_index % len(self.TEST_URLS)] self._test_url_index += 1 return url
def _get_test_url_for_protocol(self, protocol: str | None) -> str: """Get a test URL appropriate for the proxy's protocol. HTTPS proxies are validated against HTTPS endpoints to confirm they actually support TLS tunneling. """ if self._custom_test_url: return self._custom_test_url if protocol == "https": url = self.HTTPS_TEST_URLS[self._https_test_url_index % len(self.HTTPS_TEST_URLS)] self._https_test_url_index += 1 return url return self.test_url async def _get_client(self) -> httpx.AsyncClient: """ Get or create the shared HTTP client. Returns: Shared httpx.AsyncClient instance """ if self._client is None: self._client = httpx.AsyncClient( timeout=self.timeout, limits=httpx.Limits(max_connections=1000, max_keepalive_connections=100), headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" }, ) return self._client async def _get_socks_client(self, proxy_url: str) -> httpx.AsyncClient: """ Get or create the shared SOCKS client. Args: proxy_url: SOCKS proxy URL Returns: Shared httpx.AsyncClient instance configured for SOCKS Raises: ProxyValidationError: If httpx-socks is not installed """ if not SOCKS_AVAILABLE: logger.warning( "SOCKS proxy support requires httpx-socks library. " "Install with: uv sync or add httpx-socks to your dependencies" ) raise ProxyValidationError( "SOCKS proxy support requires httpx-socks library. " "Install with: uv sync or add httpx-socks to your dependencies" ) if AsyncProxyTransport is None: logger.error("SOCKS_AVAILABLE is True but AsyncProxyTransport is None") raise ProxyValidationError( "SOCKS proxy transport is unavailable. Please reinstall httpx-socks." ) if self._socks_client is None: transport = AsyncProxyTransport.from_url(proxy_url) self._socks_client = httpx.AsyncClient( transport=transport, timeout=self.timeout, limits=httpx.Limits(max_connections=1000, max_keepalive_connections=100), headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" }, ) return self._socks_client
[docs] async def close(self) -> None: """Close all client connections and cleanup resources.""" if self._client: await self._client.aclose() self._client = None if self._socks_client: await self._socks_client.aclose() self._socks_client = None
async def __aenter__(self) -> ProxyValidator: """Async context manager entry.""" return self async def __aexit__(self, *args: Any) -> None: """Async context manager exit.""" await self.close() async def _validate_tcp_connectivity(self, host: str, port: int) -> bool: """ Validate TCP connectivity to proxy. Args: host: Proxy hostname or IP address port: Proxy port number Returns: True if TCP connection succeeds, False otherwise """ import socket try: # Create TCP connection with timeout sock = socket.create_connection((host, port), timeout=self.timeout) sock.close() return True except (socket.timeout, ConnectionRefusedError, socket.gaierror, OSError): # Connection failed - timeout, refused, DNS error, or network unreachable return False async def _validate_http_request(self, proxy_url: str | None = None) -> bool: """ Validate HTTP request through proxy. Args: proxy_url: Full proxy URL (e.g., "http://proxy.example.com:8080") If None, makes request without proxy (for testing) Returns: True if HTTP request succeeds, False otherwise """ try: # httpx requires proxy at client initialization, not per-request if proxy_url: async with httpx.AsyncClient( proxy=proxy_url, timeout=self.timeout, ) as client: response = await client.get(self.test_url) return response.status_code in (200, 204) else: # No proxy - use shared client for direct requests client = await self._get_client() response = await client.get(self.test_url) return response.status_code in (200, 204) except ( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError, Exception, ): # Request failed - timeout, connection error, network error, or other return False
[docs] async def check_anonymity(self, proxy_url: str | None = None) -> str | None: """ Check proxy anonymity level by detecting IP leakage using shared client. Tests if the proxy reveals the real IP address or proxy usage through HTTP headers like X-Forwarded-For, Via, X-Real-IP, etc. Args: proxy_url: Full proxy URL (e.g., "http://proxy.example.com:8080") If None, makes request without proxy (for testing) Returns: - "transparent": Proxy leaks real IP address - "anonymous": Proxy hides IP but reveals proxy usage via headers - "elite": Proxy completely hides both IP and proxy usage - "unknown" or None: Could not determine (error occurred) """ # Headers that indicate proxy usage or IP leakage proxy_headers = { "via", "x-forwarded-for", "x-real-ip", "forwarded", "proxy-connection", "x-proxy-id", "proxy-agent", } try: # httpx requires proxy at client initialization, not per-request if proxy_url: async with httpx.AsyncClient( proxy=proxy_url, timeout=self.timeout, ) as client: response = await client.get(self.test_url) else: # No proxy - use shared client for direct requests client = await self._get_client() response = await client.get(self.test_url) if response.status_code != 200: return "unknown" # Try to parse JSON response for IP and headers try: data = response.json() headers = data.get("headers", {}) # Convert header keys to lowercase for case-insensitive comparison headers_lower = {k.lower(): v for k, v in headers.items()} # Check for headers that leak real IP (transparent proxy) if "x-forwarded-for" in headers_lower or "x-real-ip" in headers_lower: return "transparent" # Check for headers that reveal proxy usage (anonymous proxy) if any(header in headers_lower for header in proxy_headers): return "anonymous" # No proxy-revealing headers found (elite proxy) return "elite" except (ValueError, KeyError): # Could not parse response or missing expected fields return "unknown" except ( httpx.TimeoutException, httpx.ConnectError, httpx.NetworkError, Exception, ): # Request failed return "unknown"
[docs] async def validate(self, proxy: dict[str, Any]) -> ValidationResult: """ Validate proxy connectivity with fast TCP pre-check and timing metrics. Args: proxy: Proxy dictionary with 'url' key (e.g., "http://1.2.3.4:8080") Returns: ValidationResult with is_valid flag and response_time_ms (if successful) """ proxy_url = proxy.get("url") if not proxy_url: return ValidationResult(is_valid=False, response_time_ms=None) try: # Parse host:port from URL parsed = urlparse(proxy_url) host = parsed.hostname port = parsed.port if not host or not port: return ValidationResult(is_valid=False, response_time_ms=None) # Fast TCP pre-check (async) - skip HTTP if port isn't even open # Use very short timeout for TCP (1s) - if port isn't open, fail fast try: _, writer = await asyncio.wait_for( asyncio.open_connection(host, port), timeout=min(1.0, self.timeout), ) writer.close() await writer.wait_closed() except (asyncio.TimeoutError, OSError, ConnectionRefusedError): return ValidationResult(is_valid=False, response_time_ms=None) # TCP passed - now test actual proxy functionality with timing # httpx requires proxy at client initialization, not per-request is_socks = proxy_url.startswith(("socks4://", "socks5://")) proxy_protocol = proxy.get("protocol") target_url = self._get_test_url_for_protocol(proxy_protocol) start_time = time.perf_counter() if is_socks: # Create SOCKS client with transport if not SOCKS_AVAILABLE: logger.warning( f"Skipping SOCKS validation for {proxy_url}: httpx-socks library not installed. " "Install with: uv sync or add httpx-socks to your dependencies" ) # Return invalid rather than error - graceful degradation # SOCKS proxies cannot be validated without the library return ValidationResult(is_valid=False, response_time_ms=None) if AsyncProxyTransport is None: # This should not happen if SOCKS_AVAILABLE is True, but guard against it logger.error("SOCKS_AVAILABLE is True but AsyncProxyTransport is None") return ValidationResult(is_valid=False, response_time_ms=None) transport = AsyncProxyTransport.from_url(proxy_url) async with httpx.AsyncClient( transport=transport, timeout=self.timeout, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }, ) as client: response = await client.get(target_url) elapsed_ms = (time.perf_counter() - start_time) * 1000 is_valid = response.status_code in (200, 204) return ValidationResult( is_valid=is_valid, response_time_ms=elapsed_ms if is_valid else None, ) else: # For HTTPS-tagged proxies: connect to the proxy via plaintext HTTP. # Free HTTP-CONNECT proxies don't speak TLS themselves — httpx with # proxy="https://..." would attempt a TLS handshake to the proxy and fail. # The HTTPS target_url verifies CONNECT tunneling capability. effective_proxy_url = ( proxy_url.replace("https://", "http://", 1) if proxy_url.startswith("https://") else proxy_url ) async with httpx.AsyncClient( proxy=effective_proxy_url, timeout=self.timeout, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }, ) as client: response = await client.get(target_url) elapsed_ms = (time.perf_counter() - start_time) * 1000 is_valid = response.status_code in (200, 204) return ValidationResult( is_valid=is_valid, response_time_ms=elapsed_ms if is_valid else None, ) except Exception: return ValidationResult(is_valid=False, response_time_ms=None)
[docs] async def validate_batch( self, proxies: list[dict[str, Any]], progress_callback: Any | None = None, ) -> list[dict[str, Any]]: """ Validate multiple proxies in parallel with concurrency control and metrics. Uses asyncio.Semaphore to limit concurrent validations based on the configured concurrency limit. Records response time for valid proxies. Args: proxies: List of proxy dictionaries progress_callback: Optional callback(completed, total, valid_count) for progress Returns: List of working proxies with response_time_ms added to each """ if not proxies: return [] # Create semaphore to limit concurrent validations semaphore = asyncio.Semaphore(self.concurrency) completed = 0 valid_count = 0 total = len(proxies) async def validate_with_semaphore( proxy: dict[str, Any], ) -> tuple[dict[str, Any], ValidationResult]: """Validate a single proxy with semaphore control and timing.""" nonlocal completed, valid_count async with semaphore: try: result = await self.validate(proxy) completed += 1 if result.is_valid: valid_count += 1 if progress_callback: progress_callback(completed, total, valid_count) return (proxy, result) except Exception as e: # If validation raises an exception, treat as failed import logging logging.getLogger(__name__).debug( f"Validation error for {proxy.get('url')}: {e}" ) completed += 1 if progress_callback: progress_callback(completed, total, valid_count) return (proxy, ValidationResult(is_valid=False, response_time_ms=None)) # Run all validations in parallel (semaphore controls concurrency) tasks = [validate_with_semaphore(proxy) for proxy in proxies] results = await asyncio.gather(*tasks, return_exceptions=True) # Filter valid proxies and add timing metrics validated: list[dict[str, Any]] = [] for result in results: # Handle exceptions from gather if isinstance(result, Exception): continue # result is a tuple[dict[str, Any], ValidationResult] if not isinstance(result, tuple): continue proxy, validation_result = result if validation_result.is_valid: # Add response time to proxy dict for persistence # Use average_response_time_ms to match Proxy model field name proxy["average_response_time_ms"] = validation_result.response_time_ms proxy["status"] = "active" # Mark as checked with success proxy["total_requests"] = proxy.get("total_requests", 0) + 1 proxy["total_successes"] = proxy.get("total_successes", 0) + 1 # Record validation timestamp for freshness tracking proxy["last_success_at"] = datetime.now(timezone.utc) validated.append(proxy) return validated
[docs] async def validate_https_capability_batch( self, http_proxies: list[dict[str, Any]], concurrency: int = 500, max_results: int | None = None, progress_callback: Any | None = None, ) -> list[dict[str, Any]]: """Test already-validated HTTP proxies for HTTPS/CONNECT support. Many free proxy lists label HTTP proxies as "HTTPS" after testing CONNECT tunneling. This method takes working HTTP proxies and tests each via the CONNECT method against an HTTPS endpoint. Proxies that pass are returned as ``https://`` entries ready for DB insertion. Args: http_proxies: Already-validated HTTP proxy dicts (protocol='http'). concurrency: Max concurrent HTTPS tests (default 500). max_results: Stop early once this many HTTPS-capable proxies are found. progress_callback: Optional callback(completed, total, valid_count). Returns: Proxy dicts with ``protocol='https'`` and ``url='https://ip:port'`` for each HTTP proxy that successfully tunnels HTTPS via CONNECT. """ if not http_proxies: return [] # Deduplicate by URL to avoid testing the same proxy multiple times seen: set[str] = set() unique_proxies: list[dict[str, Any]] = [] for p in http_proxies: url = p.get("url", "") if url and url not in seen: seen.add(url) unique_proxies.append(p) semaphore = asyncio.Semaphore(concurrency) done_event = asyncio.Event() completed = 0 valid_count = 0 total = len(unique_proxies) # CONNECT tunneling needs: TCP connect + CONNECT handshake + TLS negotiation # + HTTP roundtrip. Cap at 8s per stage — 5s was too aggressive. per_stage = min(float(self.timeout), 8.0) https_timeout = httpx.Timeout( connect=per_stage, read=per_stage, write=2.0, pool=1.0, ) https_test_urls = self.HTTPS_TEST_URLS # Diagnostic failure counters timeout_count = 0 connection_count = 0 http_error_count = 0 other_error_count = 0 async def test_one( proxy: dict[str, Any], ) -> tuple[dict[str, Any], ValidationResult]: nonlocal completed, valid_count nonlocal timeout_count, connection_count, http_error_count, other_error_count if done_event.is_set(): completed += 1 return (proxy, ValidationResult(is_valid=False, response_time_ms=None)) async with semaphore: http_url = proxy["url"] # always http://ip:port here is_valid = False elapsed_ms: float | None = None # Try all HTTPS test URLs — succeed on any for test_url in https_test_urls: if done_event.is_set(): break try: start = time.perf_counter() async with httpx.AsyncClient( proxy=http_url, timeout=https_timeout, headers={ "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36" }, ) as client: response = await client.head(test_url) elapsed_ms = (time.perf_counter() - start) * 1000 # Accept any 2xx/3xx as proof CONNECT tunneling works if 200 <= response.status_code < 400: is_valid = True break except (httpx.TimeoutException, TimeoutError): timeout_count += 1 continue except (httpx.ConnectError, httpx.ProxyError, OSError): connection_count += 1 continue except httpx.HTTPStatusError: http_error_count += 1 continue except Exception: other_error_count += 1 continue completed += 1 if is_valid: valid_count += 1 if max_results is not None and valid_count >= max_results: done_event.set() if progress_callback: progress_callback(completed, total, valid_count) return ( proxy, ValidationResult( is_valid=is_valid, response_time_ms=elapsed_ms if is_valid else None, ), ) tasks = [test_one(proxy) for proxy in unique_proxies] results = await asyncio.gather(*tasks, return_exceptions=True) # Log diagnostic failure summary logger.info( "HTTPS CONNECT validation: {} tested, {} passed, " "failures: {} timeout, {} connection, {} HTTP error, {} other", total, valid_count, timeout_count, connection_count, http_error_count, other_error_count, ) https_capable: list[dict[str, Any]] = [] for result in results: if isinstance(result, Exception) or not isinstance(result, tuple): continue proxy, vr = result if not vr.is_valid: continue # Build an https:// variant of this proxy http_url = proxy["url"] https_url = http_url.replace("http://", "https://", 1) entry = { **proxy, "url": https_url, "protocol": "https", "average_response_time_ms": vr.response_time_ms, } https_capable.append(entry) if max_results is not None and len(https_capable) >= max_results: break return https_capable
# Retryable HTTP status codes for proxy fetching RETRYABLE_STATUS_CODES = { 429, # Too Many Requests 503, # Service Unavailable 502, # Bad Gateway 504, # Gateway Timeout } def _is_retryable_http_error(exception: BaseException) -> bool: """Check if exception is a retryable HTTP error (429, 503, 502, 504). Args: exception: Exception to check Returns: True if exception is a retryable HTTP status error """ if isinstance(exception, httpx.HTTPStatusError): return exception.response.status_code in RETRYABLE_STATUS_CODES return False def _wait_with_retry_after(retry_state: RetryCallState) -> float: """Calculate wait time respecting Retry-After header. If the exception has a Retry-After header, use that value. Otherwise, use exponential backoff (2^attempt_number seconds, max 60s). Args: retry_state: Tenacity retry call state Returns: Wait time in seconds """ import random # Try to get Retry-After header from exception exception = retry_state.outcome.exception() if retry_state.outcome else None if isinstance(exception, httpx.HTTPStatusError): retry_after = exception.response.headers.get("Retry-After") if retry_after: try: # Retry-After can be seconds or HTTP-date wait_seconds = int(retry_after) # Cap at 60 seconds to prevent DoS via long Retry-After return min(wait_seconds, 60) except ValueError: pass # Not an integer, fall through to default # Default: exponential backoff with jitter attempt = retry_state.attempt_number base_wait = min(2**attempt, 60) # Cap at 60 seconds # Add jitter (0-25% of base wait) jitter = random.uniform(0, base_wait * 0.25) # noqa: S311 return base_wait + jitter
[docs] class ProxyFetcher: """Fetch proxies from various sources.""" def __init__( self, sources: list[ProxySourceConfig] | None = None, validator: ProxyValidator | None = None, ) -> None: """ Initialize proxy fetcher. Args: sources: List of proxy source configurations validator: ProxyValidator instance for validating fetched proxies """ self.sources = sources or [] self.validator = validator or ProxyValidator() self._parsers = { "json": JSONParser, "csv": CSVParser, "plain_text": PlainTextParser, "html_table": HTMLTableParser, # Legacy aliases for backwards compatibility "text": PlainTextParser, "html": HTMLTableParser, } self._client: httpx.AsyncClient | None = None
[docs] def add_source(self, source: ProxySourceConfig) -> None: """ Add a proxy source. Args: source: Proxy source configuration to add """ self.sources.append(source)
[docs] def remove_source(self, url: str) -> None: """ Remove a proxy source by URL. Args: url: URL of source to remove """ self.sources = [s for s in self.sources if str(s.url) != url]
async def _get_client(self) -> httpx.AsyncClient: """ Get or create the shared HTTP client. Returns: Shared httpx.AsyncClient instance """ if self._client is None: self._client = httpx.AsyncClient( timeout=httpx.Timeout(30.0), limits=httpx.Limits(max_connections=50, max_keepalive_connections=10), ) return self._client
[docs] async def close(self) -> None: """Close client connection and cleanup resources.""" if self._client: await self._client.aclose() self._client = None # Also close validator's clients if self.validator: await self.validator.close()
async def __aenter__(self) -> ProxyFetcher: """Async context manager entry.""" return self async def __aexit__(self, *args: Any) -> None: """Async context manager exit.""" await self.close() @retry( stop=stop_after_attempt(5), # More retries for rate limiting scenarios wait=_wait_with_retry_after, # Respects Retry-After header with fallback to exp backoff retry=( retry_if_exception_type(httpx.TimeoutException) | retry_if_exception(_is_retryable_http_error) ), reraise=True, # Re-raise the original exception after retries exhausted )
[docs] async def fetch_from_source(self, source: ProxySourceConfig) -> list[dict[str, Any]]: """ Fetch proxies from a single source. Includes automatic retry with exponential backoff for: - HTTP 429 (Too Many Requests) - respects Retry-After header - HTTP 503 (Service Unavailable) - HTTP 502 (Bad Gateway) - HTTP 504 (Gateway Timeout) - Network timeouts Args: source: Proxy source configuration Returns: List of proxy dictionaries Raises: ProxyFetchError: If fetching fails after retries """ try: # Determine if browser rendering is needed html_content: str if source.render_mode == RenderMode.BROWSER: # Use browser rendering for JavaScript-heavy pages try: from proxywhirl.browser import BrowserRenderer except ImportError as e: raise ProxyFetchError( "Browser rendering requires Playwright. " "Install with: pip install 'proxywhirl[js]' or pip install playwright" ) from e try: async with BrowserRenderer() as renderer: html_content = await renderer.render(str(source.url)) except TimeoutError as e: raise ProxyFetchError(f"Browser timeout fetching from {source.url}: {e}") from e except RuntimeError as e: raise ProxyFetchError(f"Browser error fetching from {source.url}: {e}") from e else: # Use standard HTTP client for static pages client = await self._get_client() response = await client.get(str(source.url)) response.raise_for_status() html_content = response.text # Use custom parser if provided if source.custom_parser: proxies_list: list[dict[str, Any]] = source.custom_parser.parse(html_content) return proxies_list # Otherwise use format-based parser # Note: parser field in ProxySourceConfig is now a string identifier, not an object # Custom parsers should be registered via _parsers dict format_key = source.format.value if hasattr(source.format, "value") else source.format if format_key in self._parsers: parser_class = self._parsers[format_key] parser = parser_class() else: raise ProxyFetchError(f"Unsupported format: {source.format}") # Parse proxies proxies_list: list[dict[str, Any]] = parser.parse(html_content) # Apply source protocol if specified (for plain text SOCKS sources) if source.protocol and source.protocol != "http": for proxy in proxies_list: if "url" in proxy: # Replace http:// with the specified protocol url = proxy["url"] if url.startswith("http://"): proxy["url"] = f"{source.protocol}://{url[7:]}" # Set protocol field explicitly proxy["protocol"] = source.protocol return proxies_list except httpx.HTTPStatusError as e: # Let retryable HTTP errors (429, 503, 502, 504) bubble up to retry decorator # Only convert to ProxyFetchError for non-retryable errors if e.response.status_code not in RETRYABLE_STATUS_CODES: raise ProxyFetchError(f"HTTP error fetching from {source.url}: {e}") from e raise # Re-raise for retry decorator to handle except httpx.RequestError as e: raise ProxyFetchError(f"Request error fetching from {source.url}: {e}") from e except Exception as e: raise ProxyFetchError(f"Error fetching from {source.url}: {e}") from e
[docs] async def fetch_all( self, validate: bool = True, deduplicate: bool = True, fetch_progress_callback: Any | None = None, validate_progress_callback: Any | None = None, ) -> list[dict[str, Any]]: """ Fetch proxies from all configured sources. Args: validate: Whether to validate proxies before returning deduplicate: Whether to deduplicate proxies fetch_progress_callback: Optional callback(completed, total, proxies_found) for fetch progress validate_progress_callback: Optional callback(completed, total, valid_count) for validation progress Returns: List of proxy dictionaries """ all_proxies: list[dict[str, Any]] = [] completed_sources = 0 total_sources = len(self.sources) # Fetch from all sources with progress tracking async def fetch_with_progress(source: ProxySourceConfig) -> list[dict[str, Any]]: nonlocal completed_sources try: result = await self.fetch_from_source(source) completed_sources += 1 if fetch_progress_callback: fetch_progress_callback( completed_sources, total_sources, len(all_proxies) + len(result) ) return result except Exception: logger.opt(exception=True).warning("Failed to fetch from {}", source.url) completed_sources += 1 if fetch_progress_callback: fetch_progress_callback(completed_sources, total_sources, len(all_proxies)) return [] # Fetch from all sources in parallel tasks = [fetch_with_progress(source) for source in self.sources] results = await asyncio.gather(*tasks, return_exceptions=True) # Collect successful results for result in results: if isinstance(result, list): all_proxies.extend(result) # Skip exceptions (already logged by retry decorator) # Deduplicate if requested if deduplicate: all_proxies = deduplicate_proxies(all_proxies) # Validate all proxies if requested if validate: return await self.validator.validate_batch( all_proxies, progress_callback=validate_progress_callback ) return all_proxies
[docs] async def start_periodic_refresh( self, callback: Any | None = None, interval: int | None = None, ) -> None: """ Start periodic proxy refresh. Args: callback: Optional callback to invoke with new proxies interval: Override default refresh interval (seconds) """ while True: # Determine interval (use first source's interval if not specified) refresh_interval = interval or ( self.sources[0].refresh_interval if self.sources else 3600 ) await asyncio.sleep(refresh_interval) # Fetch new proxies proxies = await self.fetch_all() # Invoke callback if provided if callback: await callback(proxies)