"""Offline proxy metadata enrichment using local databases.
This module provides 100% offline enrichment of proxy data using:
1. MaxMind GeoLite2 local database (optional, requires download)
2. Python stdlib ipaddress for IP property analysis
3. Port signature analysis
No external API calls - all lookups are local.
"""
from __future__ import annotations
import ipaddress
from pathlib import Path
from typing import Any
from loguru import logger
# Common proxy port signatures
PORT_SIGNATURES: dict[int, str] = {
# Standard HTTP ports
80: "http",
8080: "http-alt",
8000: "http-alt",
8888: "http-alt",
3128: "squid",
# HTTPS ports
443: "https",
8443: "https-alt",
# SOCKS ports
1080: "socks",
1081: "socks-alt",
9050: "tor",
9150: "tor-browser",
# Specialized proxy ports
3129: "squid-alt",
8118: "privoxy",
8123: "polipo",
9999: "proxy-common",
# Transparent proxy ports
3130: "transparent",
8081: "transparent-alt",
}
[docs]
class OfflineEnricher:
"""Enrich proxies using local databases only - no API calls."""
def __init__(self, geoip_path: Path | None = None) -> None:
"""Initialize the enricher.
Args:
geoip_path: Path to MaxMind GeoLite2-City.mmdb file.
If None, looks in default locations.
"""
self.geoip_reader: Any = None
self._init_geoip(geoip_path)
def _init_geoip(self, geoip_path: Path | None) -> None:
"""Initialize GeoIP reader if database is available."""
# Check provided path first
paths_to_check: list[Path] = []
if geoip_path:
paths_to_check.append(geoip_path)
# Default locations
paths_to_check.extend(
[
Path.home() / ".config" / "proxywhirl" / "GeoLite2-City.mmdb",
Path.home() / ".local" / "share" / "proxywhirl" / "GeoLite2-City.mmdb",
Path("/usr/share/GeoIP/GeoLite2-City.mmdb"),
Path("/var/lib/GeoIP/GeoLite2-City.mmdb"),
]
)
for path in paths_to_check:
if path.exists():
try:
from geoip2.database import Reader
self.geoip_reader = Reader(str(path))
logger.info(f"Loaded GeoIP database from {path}")
return
except Exception as e:
logger.warning(f"Failed to load GeoIP from {path}: {e}")
continue
logger.info(
"GeoIP database not found. Run 'proxywhirl setup-geoip' to download. "
"IP analysis and port signatures will still work."
)
[docs]
def close(self) -> None:
"""Close the GeoIP reader if open."""
if self.geoip_reader:
self.geoip_reader.close()
self.geoip_reader = None
[docs]
def enrich(self, ip: str, port: int) -> dict[str, Any]:
"""Enrich a proxy with metadata.
Args:
ip: IP address string
port: Port number
Returns:
Dictionary of enrichment fields (all may be None if lookup fails)
"""
result: dict[str, Any] = {
# Geo fields (from GeoIP)
"country": None,
"country_code": None,
"city": None,
"region": None,
"latitude": None,
"longitude": None,
"timezone": None,
"continent": None,
"continent_code": None,
# IP property fields (from stdlib)
"is_private": None,
"is_global": None,
"is_loopback": None,
"is_reserved": None,
"ip_version": None,
# Port analysis
"port_type": None,
}
# 1. GeoIP lookup (if database available)
result.update(self._geoip_lookup(ip))
# 2. IP property analysis (stdlib - always works)
result.update(self._ip_analysis(ip))
# 3. Port signature analysis
result["port_type"] = self._port_analysis(port)
return result
def _geoip_lookup(self, ip: str) -> dict[str, Any]:
"""Perform GeoIP lookup if database is available."""
result: dict[str, Any] = {}
if not self.geoip_reader:
return result
try:
response = self.geoip_reader.city(ip)
result["country"] = response.country.name
result["country_code"] = response.country.iso_code
result["city"] = response.city.name
result["continent"] = response.continent.name
result["continent_code"] = response.continent.code
if response.subdivisions:
result["region"] = response.subdivisions.most_specific.name
if response.location:
result["latitude"] = response.location.latitude
result["longitude"] = response.location.longitude
result["timezone"] = response.location.time_zone
except Exception:
# IP not found in database or invalid - that's OK
pass
return result
def _ip_analysis(self, ip: str) -> dict[str, Any]:
"""Analyze IP address properties using Python stdlib."""
result: dict[str, Any] = {}
try:
ip_obj = ipaddress.ip_address(ip)
result["is_private"] = ip_obj.is_private
result["is_global"] = ip_obj.is_global
result["is_loopback"] = ip_obj.is_loopback
result["is_reserved"] = ip_obj.is_reserved
result["ip_version"] = ip_obj.version
except ValueError:
# Invalid IP address - leave as None
pass
return result
def _port_analysis(self, port: int) -> str:
"""Analyze port to determine likely proxy type."""
return PORT_SIGNATURES.get(port, "other")
[docs]
def enrich_batch(
self,
proxies: list[dict[str, Any]],
ip_field: str = "ip",
port_field: str = "port",
) -> list[dict[str, Any]]:
"""Enrich a batch of proxies in place.
Args:
proxies: List of proxy dictionaries to enrich
ip_field: Field name containing IP address
port_field: Field name containing port
Returns:
The same list with enrichment fields added
"""
for proxy in proxies:
ip = proxy.get(ip_field, "")
port = proxy.get(port_field, 0)
if ip and port:
enrichment = self.enrich(ip, port)
proxy.update(enrichment)
return proxies
[docs]
def get_default_geoip_path() -> Path:
"""Get the default path for GeoIP database."""
return Path.home() / ".config" / "proxywhirl" / "GeoLite2-City.mmdb"
[docs]
def is_geoip_available() -> bool:
"""Check if GeoIP database is available."""
enricher = OfflineEnricher()
available = enricher.geoip_reader is not None
enricher.close()
return available