"""Export functionality for generating web dashboard data.
This module provides functions to export proxy data and statistics
for consumption by the web dashboard.
"""
from __future__ import annotations
import json
import statistics
from collections import Counter, defaultdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse
from loguru import logger
from proxywhirl.geo import batch_geolocate, enrich_proxies_with_geo
from proxywhirl.sources import ALL_HTTP_SOURCES, ALL_SOCKS4_SOURCES, ALL_SOCKS5_SOURCES
from proxywhirl.storage import SQLiteStorage
# Response time distribution bins (ms): (min, max, label)
RESPONSE_TIME_BINS: list[tuple[float, float, str]] = [
(0, 100, "<100ms"),
(100, 500, "100-500ms"),
(500, 1000, "500ms-1s"),
(1000, 2000, "1-2s"),
(2000, 5000, "2-5s"),
(5000, float("inf"), ">5s"),
]
# Continent code to name mapping
CONTINENT_NAMES = {
"AF": "Africa",
"AN": "Antarctica",
"AS": "Asia",
"EU": "Europe",
"NA": "North America",
"OC": "Oceania",
"SA": "South America",
}
[docs]
def parse_proxy_url(url: str) -> tuple[str, int]:
"""Parse proxy URL to extract IP and port.
Args:
url: Full proxy URL (e.g., "http://1.2.3.4:8080")
Returns:
Tuple of (ip, port)
"""
try:
parsed = urlparse(url)
host = parsed.hostname or ""
port = parsed.port or 80
# Validate port range
if not (0 < port <= 65535):
port = 80
return host, port
except Exception:
# Handle malformed URLs
return "", 80
[docs]
async def generate_rich_proxies(
storage: SQLiteStorage,
include_geo: bool = True,
geo_sample_size: int = 5000,
max_age_hours: int = 72,
) -> dict[str, Any]:
"""Generate rich proxy data from database.
Args:
storage: SQLiteStorage instance to query
include_geo: Whether to include country data (slower)
geo_sample_size: Max IPs to geolocate (rate limited)
max_age_hours: Only include proxies validated within this time window.
Default: 72 hours (36 runs at 2h schedule). Set to 0 to include all proxies.
Returns:
dict[str, Any]: Proxies with metadata and aggregations.
"""
if max_age_hours > 0:
proxies_data = await storage.load_validated(max_age_hours)
else:
proxies_data = await storage.load()
proxies = []
seen_addresses: set[str] = set() # Track unique IP:port combinations
protocol_counts: Counter[str] = Counter()
status_counts: Counter[str] = Counter()
source_counts: Counter[str] = Counter()
country_counts: Counter[str] = Counter()
port_counts: Counter[int] = Counter()
continent_counts: Counter[str] = Counter()
response_times: list[float] = []
# For source flow (Sankey) data
source_flow: defaultdict[tuple[str, str, str], int] = defaultdict(int)
for proxy in proxies_data:
ip, port = parse_proxy_url(proxy["url"])
# Deduplicate by IP:port (same proxy may appear with different protocols)
addr = f"{ip}:{port}"
if addr in seen_addresses:
continue
seen_addresses.add(addr)
total_checks = proxy.get("total_checks", 0) or 0
total_successes = proxy.get("total_successes", 0) or 0
last_success = proxy.get("last_success_at")
last_failure = proxy.get("last_failure_at")
discovered_at = proxy.get("discovered_at")
avg_response = proxy.get("avg_response_time_ms")
proxy_dict = {
"ip": ip,
"port": port,
"protocol": proxy.get("protocol") or "http",
"status": proxy.get("health_status", "unknown"),
"response_time": avg_response,
"success_rate": (
round(total_successes / total_checks * 100, 1) if total_checks > 0 else None
),
"total_checks": total_checks,
"source": proxy.get("source", "fetched"),
"last_checked": (
last_success.isoformat()
if last_success
else last_failure.isoformat()
if last_failure
else None
),
"created_at": discovered_at.isoformat() if discovered_at else None,
"country": None,
"country_code": proxy.get("country_code"),
"continent_code": proxy.get("continent_code"),
}
proxies.append(proxy_dict)
protocol_counts[proxy_dict["protocol"]] += 1
status_counts[proxy_dict["status"]] += 1
source_counts[proxy_dict["source"]] += 1
port_counts[port] += 1
# Collect response times for statistics
if avg_response is not None and avg_response > 0:
response_times.append(avg_response)
# Add geo data if requested
if include_geo and proxies:
ips = [p["ip"] for p in proxies[:geo_sample_size]]
geo_data = await batch_geolocate(ips, max_batches=50) # Up to 5000 IPs
proxies = enrich_proxies_with_geo(proxies, geo_data)
# Count countries and continents, build source flow
for p in proxies:
country_code = p.get("country_code")
continent_code = p.get("continent_code")
if country_code:
country_counts[country_code] += 1
if continent_code:
continent_counts[continent_code] += 1
# Build source flow data (source → protocol → country)
source = p.get("source", "unknown")
protocol = p.get("protocol", "http")
country = country_code or "Unknown"
source_flow[(source, protocol, country)] += 1
# Build response time distribution
response_time_distribution = []
for bin_min, bin_max, bin_label in RESPONSE_TIME_BINS:
count = sum(1 for rt in response_times if bin_min <= rt < bin_max)
response_time_distribution.append(
{
"range": bin_label,
"min": bin_min,
"max": bin_max if bin_max != float("inf") else None,
"count": count,
}
)
# Build port distribution (top 15 + others)
top_ports = port_counts.most_common(15)
by_port = [{"port": port, "count": count} for port, count in top_ports]
other_port_count = sum(
count for port, count in port_counts.items() if port not in dict(top_ports)
)
if other_port_count > 0:
by_port.append({"port": 0, "count": other_port_count, "label": "Other"})
# Performance statistics
performance = {}
if response_times:
sorted_times = sorted(response_times)
p95_idx = int(len(sorted_times) * 0.95)
performance = {
"avg_ms": round(statistics.mean(response_times), 1),
"median_ms": round(statistics.median(response_times), 1),
"p95_ms": round(
sorted_times[p95_idx] if p95_idx < len(sorted_times) else sorted_times[-1], 1
),
"min_ms": round(min(response_times), 1),
"max_ms": round(max(response_times), 1),
"samples": len(response_times),
}
# Build source flow for Sankey (limit to top entries)
source_flow_list = [
{"source": s, "protocol": p, "country": c, "count": cnt}
for (s, p, c), cnt in sorted(source_flow.items(), key=lambda x: -x[1])[:200]
]
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total": len(proxies),
"proxies": proxies,
"aggregations": {
"by_protocol": dict(protocol_counts),
"by_status": dict(status_counts),
"by_source": dict(source_counts),
"by_country": dict(country_counts),
"by_port": by_port,
"by_continent": dict(continent_counts),
"response_time_distribution": response_time_distribution,
"performance": performance,
"source_flow": source_flow_list,
},
}
[docs]
def generate_stats_from_files(proxy_dir: Path) -> dict[str, Any]:
"""Generate statistics from proxy list files and rich proxy data.
Args:
proxy_dir: Path to directory containing proxy list files
Returns:
dict[str, Any]: Dashboard statistics including health, performance,
validation, geographic, and source ranking data.
"""
# Read existing metadata
metadata_path = proxy_dir / "metadata.json"
if not metadata_path.exists():
logger.warning("metadata.json not found, using empty defaults")
metadata = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"total_sources": 0,
"counts": {},
}
else:
with open(metadata_path) as f:
metadata = json.load(f)
# Read rich proxy data for aggregations
rich_path = proxy_dir / "proxies-rich.json"
rich_data: dict[str, Any] = {}
if rich_path.exists():
try:
with open(rich_path) as f:
rich_data = json.load(f)
except (json.JSONDecodeError, OSError) as e:
logger.warning(f"Failed to read proxies-rich.json: {e}")
aggregations = rich_data.get("aggregations", {})
# Calculate file sizes
file_sizes = {}
files_to_check = [
"http.txt",
"https.txt",
"socks4.txt",
"socks5.txt",
"all.txt",
"proxies.json",
]
for filename in files_to_check:
path = proxy_dir / filename
if path.exists():
file_sizes[filename] = path.stat().st_size
# Count lines in each protocol file
proxy_counts = {}
for protocol in ["http", "https", "socks4", "socks5"]:
path = proxy_dir / f"{protocol}.txt"
if path.exists():
with open(path) as f:
proxy_counts[protocol] = sum(1 for line in f if line.strip())
else:
proxy_counts[protocol] = metadata.get("counts", {}).get(protocol, 0)
# Calculate unique proxies
unique_proxies = set()
for protocol in ["http", "https", "socks4", "socks5"]:
path = proxy_dir / f"{protocol}.txt"
if path.exists():
with open(path) as f:
for line in f:
if line.strip():
unique_proxies.add(line.strip())
unique_count = len(unique_proxies) if unique_proxies else 0
# Build health stats from status aggregation
status_agg = aggregations.get("by_status", {})
health = {
"healthy": status_agg.get("healthy", 0),
"unhealthy": status_agg.get("unhealthy", 0),
"dead": status_agg.get("dead", 0),
"unknown": status_agg.get("unknown", 0),
}
total_validated = health["healthy"] + health["unhealthy"] + health["dead"]
# Performance stats from aggregations
perf_agg = aggregations.get("performance", {})
performance = {
"avg_response_ms": perf_agg.get("avg_ms"),
"median_response_ms": perf_agg.get("median_ms"),
"p95_response_ms": perf_agg.get("p95_ms"),
"min_response_ms": perf_agg.get("min_ms"),
"max_response_ms": perf_agg.get("max_ms"),
"samples": perf_agg.get("samples", 0),
}
# Validation stats
validation = {
"total_validated": total_validated,
"success_rate_pct": round(health["healthy"] / total_validated * 100, 1)
if total_validated > 0
else 0,
}
# Geographic stats
country_agg = aggregations.get("by_country", {})
continent_agg = aggregations.get("by_continent", {})
top_countries = sorted(
[{"code": k, "count": v} for k, v in country_agg.items()],
key=lambda x: -x["count"],
)[:15]
geographic = {
"total_countries": len(country_agg),
"top_countries": top_countries,
"by_continent": {
code: {"name": CONTINENT_NAMES.get(code, code), "count": count}
for code, count in continent_agg.items()
},
}
# Source ranking
source_agg = aggregations.get("by_source", {})
top_sources = sorted(
[{"name": k, "count": v} for k, v in source_agg.items()],
key=lambda x: -x["count"],
)[:20]
sources_ranking = {
"total_active": len(source_agg),
"top_sources": top_sources,
}
# Include pre-computed aggregations for frontend
response_time_distribution = aggregations.get("response_time_distribution", [])
by_port = aggregations.get("by_port", [])
source_flow = aggregations.get("source_flow", [])
return {
"generated_at": metadata.get("generated_at", datetime.now(timezone.utc).isoformat()),
"sources": {
"total": metadata.get("total_sources", 0),
},
"proxies": {
"total": unique_count,
"unique": unique_count,
"by_protocol": proxy_counts,
},
"file_sizes": file_sizes,
# Enhanced stats sections
"health": health,
"performance": performance,
"validation": validation,
"geographic": geographic,
"sources_ranking": sources_ranking,
# Pre-computed aggregations for charts
"aggregations": {
"response_time_distribution": response_time_distribution,
"by_port": by_port,
"by_continent": continent_agg,
"source_flow": source_flow,
},
}
def _sort_by_speed(addr_times: dict[str, float | None]) -> list[str]:
"""Sort addresses by response time (fastest first, unknowns last)."""
return sorted(
addr_times.keys(),
key=lambda a: (addr_times[a] is None, addr_times[a] or float("inf")),
)
[docs]
async def generate_proxy_lists(
storage: SQLiteStorage,
output_dir: Path,
max_age_hours: int = 72,
) -> dict[str, int]:
"""Generate proxy list text files and metadata.json from database.
Creates:
- http.txt, https.txt, socks4.txt, socks5.txt (one proxy per line)
- all.txt (combined with headers)
- proxies.json (structured JSON with metadata)
- metadata.json (counts and timestamp)
Args:
storage: SQLiteStorage instance to query
output_dir: Directory to write output files
max_age_hours: Only include proxies validated within this time window.
Default: 72 hours (36 runs at 2h schedule). Set to 0 to include all proxies.
Returns:
dict[str, int]: Mapping of protocol name to proxy count.
"""
output_dir.mkdir(parents=True, exist_ok=True)
if max_age_hours > 0:
proxies_data = await storage.load_validated(max_age_hours)
else:
proxies_data = await storage.load()
# Group proxies by protocol, tracking response time for sorting
# Maps protocol -> {addr: best_response_time_ms} (None = unknown)
proxies_by_protocol: dict[str, dict[str, float | None]] = {
"http": {},
"https": {},
"socks4": {},
"socks5": {},
}
for proxy in proxies_data:
ip, port = parse_proxy_url(proxy["url"])
if not ip:
continue
addr = f"{ip}:{port}"
protocol = (proxy.get("protocol") or "http").lower()
response_time = proxy.get("avg_response_time_ms")
if protocol in proxies_by_protocol:
existing = proxies_by_protocol[protocol].get(addr)
# Keep the best (lowest) response time for deduped addresses
if existing is None or (response_time is not None and response_time < existing):
proxies_by_protocol[protocol][addr] = response_time
timestamp = datetime.now(timezone.utc).isoformat()
total_sources = len(ALL_HTTP_SOURCES) + len(ALL_SOCKS4_SOURCES) + len(ALL_SOCKS5_SOURCES)
metadata = {
"generated_at": timestamp,
"total_sources": total_sources,
"counts": {protocol: len(addrs) for protocol, addrs in proxies_by_protocol.items()},
}
# Save each protocol in TXT format (sorted fastest-first)
for protocol, addr_times in proxies_by_protocol.items():
txt_file = output_dir / f"{protocol}.txt"
with open(txt_file, "w") as f:
for proxy_addr in _sort_by_speed(addr_times):
f.write(f"{proxy_addr}\n")
logger.info(f"Saved {len(addr_times)} {protocol} proxies to {txt_file}")
# Save all proxies in one file (sorted fastest-first per section)
all_txt = output_dir / "all.txt"
with open(all_txt, "w") as f:
for protocol in ["http", "https", "socks4", "socks5"]:
addr_times = proxies_by_protocol[protocol]
f.write(f"# {protocol.upper()} Proxies ({len(addr_times)})\n")
for proxy_addr in _sort_by_speed(addr_times):
f.write(f"{proxy_addr}\n")
f.write("\n")
logger.info(f"Saved all proxies to {all_txt}")
# Save in JSON format (sorted fastest-first)
json_file = output_dir / "proxies.json"
json_data = {
"metadata": metadata,
"proxies": {
protocol: _sort_by_speed(addr_times)
for protocol, addr_times in proxies_by_protocol.items()
},
}
with open(json_file, "w") as f:
json.dump(json_data, f, indent=2)
logger.info(f"Saved JSON to {json_file}")
# Save metadata
meta_file = output_dir / "metadata.json"
with open(meta_file, "w") as f:
json.dump(metadata, f, indent=2)
logger.info(f"Saved metadata to {meta_file}")
return {protocol: len(addrs) for protocol, addrs in proxies_by_protocol.items()}
[docs]
async def export_for_web(
db_path: Path,
output_dir: Path,
include_stats: bool = True,
include_rich_proxies: bool = True,
include_proxy_lists: bool = True,
max_age_hours: int = 72,
) -> dict[str, Path]:
"""Export data for the web dashboard.
Args:
db_path: Path to SQLite database
output_dir: Directory to write output files
include_stats: Whether to generate stats.json
include_rich_proxies: Whether to generate proxies-rich.json
include_proxy_lists: Whether to generate text files and metadata.json
max_age_hours: Only include proxies validated within this time window.
Default: 72 hours (36 runs at 2h schedule). Set to 0 to include all proxies.
Returns:
dict[str, Path]: Mapping of output type to file path.
"""
output_dir.mkdir(parents=True, exist_ok=True)
outputs: dict[str, Path] = {}
# Check if we need database access
needs_db = (include_proxy_lists or include_rich_proxies) and db_path.exists()
if needs_db:
storage = SQLiteStorage(db_path)
await storage.initialize()
try:
# Generate proxy list files first (metadata.json, txt files, proxies.json)
# This must happen before stats since stats reads from these files
if include_proxy_lists:
logger.info("Generating proxy list files from database...")
counts = await generate_proxy_lists(storage, output_dir, max_age_hours)
outputs["metadata"] = output_dir / "metadata.json"
outputs["proxies_json"] = output_dir / "proxies.json"
logger.info(f"Proxy lists generated: {counts}")
# Generate rich proxy data (proxies-rich.json)
if include_rich_proxies:
logger.info("Generating rich proxy data from database...")
rich_data = await generate_rich_proxies(storage, max_age_hours=max_age_hours)
rich_path = output_dir / "proxies-rich.json"
with open(rich_path, "w") as f:
json.dump(rich_data, f)
outputs["proxies_rich"] = rich_path
logger.info(f"Rich proxy data saved to {rich_path}")
logger.info(f"Total rich proxies: {rich_data['total']}")
finally:
await storage.close()
elif include_proxy_lists or include_rich_proxies:
logger.warning(f"Database not found at {db_path}, skipping database exports")
# Generate stats from text files (reads metadata.json and txt files)
if include_stats:
logger.info("Generating stats.json...")
stats = generate_stats_from_files(output_dir)
stats_path = output_dir / "stats.json"
with open(stats_path, "w") as f:
json.dump(stats, f, indent=2)
outputs["stats"] = stats_path
logger.info(f"Stats saved to {stats_path}")
logger.info(f"Total proxies: {stats['proxies']['total']}")
return outputs