Source code for proxywhirl.geo

"""IP Geolocation utilities for proxy country lookup.

Uses MaxMind GeoLite2-Country database for fast, offline lookups.
Falls back to ip-api.com batch API if database not available.
"""

from __future__ import annotations

import asyncio
from pathlib import Path
from typing import Any

import httpx
from loguru import logger

# GeoLite2 database paths to check (in order of preference)
GEOLITE2_PATHS = [
    Path("GeoLite2-Country.mmdb"),
    Path("data/GeoLite2-Country.mmdb"),
    Path(__file__).parent / "data" / "GeoLite2-Country.mmdb",
    Path(__file__).parent.parent / "GeoLite2-Country.mmdb",
    Path.home() / ".local" / "share" / "GeoIP" / "GeoLite2-Country.mmdb",
    Path("/usr/share/GeoIP/GeoLite2-Country.mmdb"),
]


def _find_geolite2_db() -> Path | None:
    """Find GeoLite2 database file."""
    for path in GEOLITE2_PATHS:
        if path.exists():
            return path
    return None


[docs] def geolocate_with_database(ips: list[str], db_path: Path) -> dict[str, dict[str, str]]: """Geolocate IPs using local GeoLite2 database. Args: ips: List of IP addresses to lookup db_path: Path to GeoLite2-Country.mmdb file Returns: dict[str, dict[str, str]]: Mapping of IP to geo info with country and countryCode keys. """ try: import geoip2.database import geoip2.errors except ImportError: logger.warning("geoip2 not installed, skipping database lookup") return {} results: dict[str, dict[str, str]] = {} unique_ips = list(set(ips)) logger.info(f"Geolocating {len(unique_ips)} IPs using GeoLite2 database...") try: with geoip2.database.Reader(str(db_path)) as reader: for ip in unique_ips: try: response = reader.country(ip) results[ip] = { "country": response.country.name or "", "countryCode": response.country.iso_code or "", } except geoip2.errors.AddressNotFoundError: # IP not in database (private IP, etc.) pass except Exception as e: logger.debug(f"Failed to lookup {ip}: {e}") except Exception as e: logger.error(f"Failed to open GeoLite2 database: {e}") return {} logger.info(f"Geolocated {len(results)}/{len(unique_ips)} IPs from database") return results
[docs] async def geolocate_with_api( ips: list[str], batch_size: int = 100, max_batches: int = 50, ) -> dict[str, dict[str, str]]: """Batch geolocate IPs using ip-api.com (fallback). Args: ips: List of IP addresses to lookup batch_size: Number of IPs per batch (max 100 for ip-api.com) max_batches: Maximum number of batches to process Returns: dict[str, dict[str, str]]: Mapping of IP to geo info with country and countryCode keys. """ results: dict[str, dict[str, str]] = {} unique_ips = list(set(ips))[: batch_size * max_batches] if not unique_ips: return results logger.info(f"Geolocating {len(unique_ips)} IPs via ip-api.com...") async with httpx.AsyncClient(timeout=30.0) as client: for i in range(0, len(unique_ips), batch_size): batch = unique_ips[i : i + batch_size] try: response = await client.post( "http://ip-api.com/batch?fields=query,country,countryCode,status", json=batch, ) response.raise_for_status() for item in response.json(): if item.get("status") == "success": results[item["query"]] = { "country": item.get("country", ""), "countryCode": item.get("countryCode", ""), } # Rate limit: 15 requests per minute for free tier if i + batch_size < len(unique_ips): await asyncio.sleep(4.5) except Exception as e: logger.warning(f"Geo API batch {i // batch_size + 1} failed: {e}") continue logger.info(f"Geolocated {len(results)}/{len(unique_ips)} IPs via API") return results
[docs] async def batch_geolocate( ips: list[str], batch_size: int = 100, max_batches: int = 50, db_path: Path | None = None, ) -> dict[str, dict[str, str]]: """Geolocate IPs using best available method. Tries in order: 1. Local GeoLite2 database (fast, no rate limits) 2. ip-api.com batch API (fallback, rate limited) Args: ips: List of IP addresses to lookup batch_size: Number of IPs per API batch max_batches: Maximum API batches to process db_path: Optional explicit path to GeoLite2 database Returns: dict[str, dict[str, str]]: Mapping of IP to geo info with country and countryCode keys. """ if not ips: return {} # Try local database first db = db_path or _find_geolite2_db() if db: logger.info(f"Using GeoLite2 database: {db}") return geolocate_with_database(ips, db) # Fall back to API logger.info("GeoLite2 database not found, falling back to API") return await geolocate_with_api(ips, batch_size, max_batches)
[docs] def enrich_proxies_with_geo( proxies: list[dict[str, Any]], geo_data: dict[str, dict[str, str]], ) -> list[dict[str, Any]]: """Add country info to proxy dictionaries. Args: proxies: List of proxy dictionaries with 'ip' field geo_data: Geo lookup results from batch_geolocate Returns: Proxies with country and country_code fields added """ for proxy in proxies: ip = proxy.get("ip", "") geo = geo_data.get(ip, {}) proxy["country"] = geo.get("country") proxy["country_code"] = geo.get("countryCode") return proxies