Source code for proxywhirl.storage

"""Storage backends for persisting proxy pools."""

from __future__ import annotations

import base64
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.parse import urlparse

import aiofiles
from cryptography.fernet import Fernet
from loguru import logger
from pydantic import SecretStr
from sqlalchemy import delete, text
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from sqlmodel import Field, SQLModel, select
from sqlmodel.ext.asyncio.session import AsyncSession

from proxywhirl.cache.crypto import CredentialEncryptor
from proxywhirl.models import Proxy

# Prefix for encrypted credentials stored as strings
_ENCRYPTED_PREFIX = "encrypted:"


[docs] class ProxyIdentityTable(SQLModel, table=True): """Immutable proxy identity table (normalized schema). This table stores the core identity of each proxy, with fields that rarely change. Geographic and source metadata are stored here. Primary Key: url: Full proxy URL (e.g., "http://1.2.3.4:8080") Indexes: - protocol: Fast filtering by protocol type - host_port: Unique constraint to prevent duplicates - country_code: Geographic filtering - source: Source-based queries """ __tablename__: str = "proxy_identities" # type: ignore[misc] url: str = Field(primary_key=True) protocol: str = Field(index=True) # http, https, socks4, socks5 host: str = Field(index=True) port: int username: str | None = None password: str | None = None # Encrypted at app level # Geographic (immutable per IP) country_code: str | None = Field(default=None, index=True) country_name: str | None = None region: str | None = None city: str | None = None latitude: float | None = None longitude: float | None = None asn: int | None = None isp_name: str | None = None is_residential: bool = False is_datacenter: bool = False # Source metadata source: str = Field(default="fetched", index=True) source_url: str | None = None discovered_at: datetime = Field( default_factory=lambda: datetime.now(__import__("datetime").timezone.utc) )
[docs] class ValidationResultTable(SQLModel, table=True): """Individual validation result table (append-only). Stores each validation attempt as an immutable record. This enables historical analysis and trend tracking. Indexes: - proxy_url + validated_at: Fast lookup of recent validations - validated_at: Time-range queries - is_valid + validated_at: Finding recent valid proxies """ __tablename__: str = "validation_results" # type: ignore[misc] id: int | None = Field(default=None, primary_key=True) proxy_url: str = Field(index=True) # References proxy_identities.url validated_at: datetime = Field( default_factory=lambda: datetime.now(__import__("datetime").timezone.utc), index=True, ) is_valid: bool response_time_ms: float | None = None error_type: str | None = None # timeout, connection_refused, ssl_error, etc. error_message: str | None = None
[docs] class ProxyStatusTable(SQLModel, table=True): """Current computed status table (normalized schema). This table maintains the current state of each proxy, computed from validation results. It's updated after each validation. Primary Key: proxy_url: References proxy_identities.url Indexes: - health_status: Filter by current health - last_success_at: Find recently working proxies - success_rate_7d: Performance-based sorting """ __tablename__: str = "proxy_statuses" # type: ignore[misc] proxy_url: str = Field(primary_key=True) # References proxy_identities.url health_status: str = Field(default="unknown", index=True) # healthy, unhealthy, dead, unknown last_success_at: datetime | None = Field(default=None, index=True) last_failure_at: datetime | None = None last_check_at: datetime | None = None consecutive_successes: int = 0 consecutive_failures: int = 0 total_checks: int = 0 total_successes: int = 0 avg_response_time_ms: float | None = None success_rate_7d: float | None = Field(default=None, index=True) updated_at: datetime = Field( default_factory=lambda: datetime.now(__import__("datetime").timezone.utc) )
[docs] class FileStorage: """File-based storage backend using JSON. Stores proxies in a JSON file with atomic writes to prevent corruption. Supports optional encryption for sensitive credential data. """ def __init__(self, filepath: str | Path, encryption_key: bytes | None = None) -> None: """Initialize file storage. Args: filepath: Path to the JSON file for storage encryption_key: Optional Fernet encryption key for encrypting credentials. If provided, all data will be encrypted at rest. """ self.filepath = Path(filepath) self.encryption_key = encryption_key self._cipher = Fernet(encryption_key) if encryption_key else None
[docs] async def save(self, proxies: list[Proxy]) -> None: """Save proxies to JSON file. Args: proxies: List of proxies to save Raises: IOError: If save operation fails """ # Ensure parent directory exists self.filepath.parent.mkdir(parents=True, exist_ok=True) # Serialize proxies - use model_dump with context to reveal secrets for storage data = [] for proxy in proxies: proxy_dict = proxy.model_dump(mode="json") # Reveal SecretStr values for storage if proxy.username: proxy_dict["username"] = proxy.username.get_secret_value() if proxy.password: proxy_dict["password"] = proxy.password.get_secret_value() data.append(proxy_dict) json_str = json.dumps(data, indent=2) # Encrypt if cipher is configured content: bytes | str write_mode: str if self._cipher: content = self._cipher.encrypt(json_str.encode("utf-8")) write_mode = "wb" else: content = json_str write_mode = "w" # Atomic write: write to temp file then rename # Use unique temp file to avoid race conditions in concurrent saves import uuid temp_path = self.filepath.with_suffix(f".tmp.{uuid.uuid4().hex[:8]}") try: async with aiofiles.open(temp_path, write_mode) as f: await f.write(content) # Ensure file is flushed to disk before renaming await f.flush() # File is now closed and flushed, safe to rename temp_path.replace(self.filepath) except Exception as e: # Clean up temp file if it exists if temp_path.exists(): temp_path.unlink() raise OSError(f"Failed to save proxies: {e}") from e
[docs] async def load(self) -> list[Proxy]: """Load proxies from JSON file. Returns: List of proxies loaded from file Raises: FileNotFoundError: If file doesn't exist ValueError: If JSON is invalid or data is corrupted cryptography.fernet.InvalidToken: If decryption fails (wrong key) """ from cryptography.fernet import InvalidToken if not self.filepath.exists(): raise FileNotFoundError(f"Storage file not found: {self.filepath}") try: # Read file content if self._cipher: async with aiofiles.open(self.filepath, "rb") as f: encrypted_data = await f.read() # Decrypt data json_str = self._cipher.decrypt(encrypted_data).decode("utf-8") data = json.loads(json_str) else: async with aiofiles.open(self.filepath) as f: content = await f.read() data = json.loads(content) # Deserialize to Proxy objects proxies = [Proxy.model_validate(item) for item in data] return proxies except InvalidToken: # Re-raise InvalidToken without wrapping it raise except json.JSONDecodeError as e: raise ValueError(f"Invalid JSON in storage file: {e}") from e except Exception as e: raise ValueError(f"Failed to load proxies: {e}") from e
[docs] async def clear(self) -> None: """Clear all proxies from storage by deleting the file. Raises: IOError: If clear operation fails """ try: if self.filepath.exists(): self.filepath.unlink() except Exception as e: raise OSError(f"Failed to clear storage: {e}") from e
[docs] class SQLiteStorage: """SQLite-based storage backend with normalized 3-table schema. Uses a normalized schema for efficient storage and querying: - proxy_identities: Immutable proxy identity (URL, host, port, geo) - proxy_statuses: Current computed status (health, response times) - validation_results: Append-only validation history Features: - Async operations using aiosqlite for non-blocking I/O - Normalized schema prevents SQL variable overflow - Advanced filtering by health_status, protocol, country - Automatic schema creation - EMA-based response time tracking - Health status state machine (unknown → healthy/unhealthy/dead) Example:: storage = SQLiteStorage("proxies.db") await storage.initialize() # Add proxies await storage.add_proxy(proxy) added, skipped = await storage.add_proxies_batch(proxies) # Record validation results await storage.record_validation(proxy_url, is_valid=True, response_time_ms=150) # Query proxies healthy = await storage.get_healthy_proxies(max_age_hours=48) dead = await storage.get_proxies_by_status("dead") # Cleanup and stats counts = await storage.cleanup(remove_dead=True, remove_stale_days=7) stats = await storage.get_stats() await storage.close() """ def __init__( self, filepath: str | Path, use_async_driver: bool = True, pool_size: int = 5, pool_max_overflow: int = 10, pool_timeout: float = 30.0, pool_recycle: int = 3600, ) -> None: """Initialize SQLite storage with connection pooling. Args: filepath: Path to the SQLite database file. Will be created if it doesn't exist. Parent directories will be created automatically. use_async_driver: Whether to use async aiosqlite driver (True, recommended) for true non-blocking async I/O. When False, uses a compatibility mode that may fall back to sync operations. Default is True for best async performance. Note: Current implementation requires aiosqlite; this option is reserved for future compatibility enhancements. pool_size: Size of the connection pool (max concurrent connections). Default: 5 pool_max_overflow: Max overflow connections beyond pool_size. Default: 10 pool_timeout: Timeout in seconds when getting connection from pool. Default: 30.0 pool_recycle: Recycle connections after N seconds (-1 to disable). Default: 3600 """ self.filepath = Path(filepath) self.use_async_driver = use_async_driver self.pool_size = pool_size self.pool_max_overflow = pool_max_overflow self.pool_timeout = pool_timeout self.pool_recycle = pool_recycle # Current implementation uses aiosqlite for async operations # use_async_driver parameter is provided for configuration compatibility # and future enhancements but currently always uses async driver db_url = f"sqlite+aiosqlite:///{self.filepath}" # Note: SQLite with aiosqlite uses StaticPool by default which doesn't support # pool_size, max_overflow, pool_timeout parameters. These are stored as instance # attributes for API compatibility and future enhancement with other backends. self.engine: AsyncEngine = create_async_engine( db_url, echo=False, ) # Initialize credential encryptor for secure credential storage self._encryptor: CredentialEncryptor | None = None self._init_encryptor() def _init_encryptor(self) -> None: """Initialize the credential encryptor. Creates a CredentialEncryptor using environment-based keys. Logs a warning if encryption is not configured (credentials will be stored unencrypted). """ try: self._encryptor = CredentialEncryptor() logger.debug("Credential encryption initialized for SQLite storage") except Exception as e: logger.warning( f"Credential encryption not available, storing credentials unencrypted: {e}" ) self._encryptor = None def _encrypt_credential(self, value: str | None) -> str | None: """Encrypt a credential value for storage. Args: value: Plaintext credential value (or None) Returns: Encrypted credential as prefixed base64 string, or None if input is None. If encryption is not configured, returns plaintext value. """ if value is None: return None if self._encryptor is None: # Encryption not configured, store plaintext (with warning logged at init) return value try: encrypted_bytes = self._encryptor.encrypt(SecretStr(value)) if not encrypted_bytes: return None # Encode as base64 string with prefix for storage in TEXT column encoded = base64.b64encode(encrypted_bytes).decode("utf-8") return f"{_ENCRYPTED_PREFIX}{encoded}" except Exception as e: logger.error(f"Failed to encrypt credential, storing unencrypted: {e}") return value def _decrypt_credential(self, value: str | None) -> str | None: """Decrypt a credential value from storage. Args: value: Stored credential value (encrypted with prefix or plaintext legacy) Returns: Decrypted plaintext credential, or None if input is None. Handles legacy unencrypted values gracefully. """ if value is None: return None # Check if value is encrypted (has prefix) if not value.startswith(_ENCRYPTED_PREFIX): # Legacy plaintext value - return as-is logger.debug("Found legacy unencrypted credential in storage") return value if self._encryptor is None: logger.error( "Cannot decrypt credential: encryptor not configured. " "Set PROXYWHIRL_CACHE_ENCRYPTION_KEY environment variable." ) return None try: # Remove prefix and decode base64 encoded = value[len(_ENCRYPTED_PREFIX) :] encrypted_bytes = base64.b64decode(encoded) # Decrypt decrypted = self._encryptor.decrypt(encrypted_bytes) return decrypted.get_secret_value() if decrypted else None except Exception as e: logger.error(f"Failed to decrypt credential: {e}") return None
[docs] async def initialize(self) -> None: """Create database tables if they don't exist. Should be called once before any other operations. Creates the 'proxies' table with all necessary columns and indexes. Safe to call multiple times - existing tables won't be affected. Raises: Exception: If database initialization fails """ from sqlmodel import SQLModel async with self.engine.begin() as conn: await conn.run_sync(SQLModel.metadata.create_all) # Drop legacy 'proxies' table from old schema await conn.execute(text("DROP TABLE IF EXISTS proxies"))
[docs] async def close(self) -> None: """Close database connection and release resources. Should be called when done with the storage to properly cleanup database connections. Safe to call multiple times. """ await self.engine.dispose()
[docs] async def add_proxy(self, proxy: Proxy) -> bool: """Add a new proxy if not exists. Args: proxy: Proxy model to add Returns: True if added, False if already exists """ async with AsyncSession(self.engine) as session: existing = await session.get(ProxyIdentityTable, proxy.url) if existing: return False # Parse URL for host:port parsed = urlparse(proxy.url) row = ProxyIdentityTable( url=proxy.url, protocol=proxy.protocol or "http", host=parsed.hostname or "", port=parsed.port or 80, username=self._encrypt_credential( proxy.username.get_secret_value() if proxy.username else None ), password=self._encrypt_credential( proxy.password.get_secret_value() if proxy.password else None ), country_code=proxy.country_code, source=proxy.source.value if proxy.source else "fetched", source_url=str(proxy.source_url) if proxy.source_url else None, ) session.add(row) # Initialize status status = ProxyStatusTable(proxy_url=proxy.url) session.add(status) await session.commit() return True
[docs] async def add_proxies_batch( self, proxies: list[Proxy], validated: bool = False ) -> tuple[int, int]: """Add multiple proxies to the normalized schema. Args: proxies: List of Proxy models to add validated: If True, mark proxies as already validated (healthy status, last_success_at set). Use this when proxies have passed validation before being saved. Returns: Tuple of (added_count, skipped_count) """ added = 0 skipped = 0 now = datetime.now(timezone.utc) async with AsyncSession(self.engine) as session: for proxy in proxies: existing = await session.get(ProxyIdentityTable, proxy.url) if existing: skipped += 1 continue parsed = urlparse(proxy.url) row = ProxyIdentityTable( url=proxy.url, protocol=proxy.protocol or "http", host=parsed.hostname or "", port=parsed.port or 80, username=self._encrypt_credential( proxy.username.get_secret_value() if proxy.username else None ), password=self._encrypt_credential( proxy.password.get_secret_value() if proxy.password else None ), source=proxy.source.value if proxy.source else "fetched", ) session.add(row) # Create status - mark as validated if proxies passed validation if validated: status = ProxyStatusTable( proxy_url=proxy.url, health_status="healthy", last_success_at=now, last_check_at=now, total_checks=1, total_successes=1, consecutive_successes=1, avg_response_time_ms=proxy.average_response_time_ms, ) else: status = ProxyStatusTable(proxy_url=proxy.url) session.add(status) added += 1 await session.commit() return added, skipped
[docs] async def save(self, proxies: list[Proxy], validated: bool = False) -> None: """Save proxies to database (adds new, skips existing). This is a compatibility wrapper around add_proxies_batch(). Args: proxies: List of proxies to save. Empty list is allowed (no-op). validated: If True, mark proxies as already validated (healthy status). """ if not proxies: return await self.add_proxies_batch(proxies, validated=validated)
[docs] async def delete(self, proxy_url: str) -> bool: """Delete a proxy by URL. Args: proxy_url: URL of the proxy to delete Returns: True if deleted, False if not found """ async with AsyncSession(self.engine) as session: # Delete from status table first del_status = delete(ProxyStatusTable).where(ProxyStatusTable.proxy_url == proxy_url) await session.exec(del_status) # type: ignore[arg-type] # Delete from identity table del_identity = delete(ProxyIdentityTable).where(ProxyIdentityTable.url == proxy_url) result = await session.exec(del_identity) # type: ignore[arg-type] await session.commit() return result.rowcount > 0 if hasattr(result, "rowcount") else True
[docs] async def clear(self) -> None: """Clear all proxies from database.""" async with AsyncSession(self.engine) as session: # Delete all validation results await session.exec(delete(ValidationResultTable)) # type: ignore[arg-type] # Delete all statuses await session.exec(delete(ProxyStatusTable)) # type: ignore[arg-type] # Delete all identities await session.exec(delete(ProxyIdentityTable)) # type: ignore[arg-type] await session.commit()
[docs] async def query(self, **filters: str) -> list[dict[str, Any]]: """Query proxies with filtering. Args: **filters: Filter criteria (source, health_status) Returns: List of proxy dictionaries matching criteria """ stmt = select(ProxyIdentityTable, ProxyStatusTable).join( ProxyStatusTable, ProxyIdentityTable.url == ProxyStatusTable.proxy_url ) if "source" in filters: stmt = stmt.where(ProxyIdentityTable.source == filters["source"]) if "health_status" in filters: stmt = stmt.where(ProxyStatusTable.health_status == filters["health_status"]) async with AsyncSession(self.engine) as session: result = await session.exec(stmt) # type: ignore[arg-type] rows = result.all() proxies = [] for identity, status in rows: proxies.append( { "url": identity.url, "protocol": identity.protocol, "host": identity.host, "port": identity.port, "username": self._decrypt_credential(identity.username), "password": self._decrypt_credential(identity.password), "country_code": identity.country_code, "source": identity.source, "discovered_at": identity.discovered_at, "health_status": status.health_status, "last_success_at": status.last_success_at, "last_failure_at": status.last_failure_at, "avg_response_time_ms": status.avg_response_time_ms, "total_checks": status.total_checks, "total_successes": status.total_successes, } ) return proxies
[docs] async def record_validation( self, proxy_url: str, is_valid: bool, response_time_ms: float | None = None, error_type: str | None = None, error_message: str | None = None, ) -> None: """Record a validation result and update proxy status. Args: proxy_url: URL of the proxy that was validated is_valid: Whether the validation succeeded response_time_ms: Response time in milliseconds (if successful) error_type: Type of error (if failed) error_message: Error message (if failed) """ now = datetime.now(timezone.utc) async with AsyncSession(self.engine) as session: # Insert validation record validation = ValidationResultTable( proxy_url=proxy_url, validated_at=now, is_valid=is_valid, response_time_ms=response_time_ms, error_type=error_type, error_message=error_message, ) session.add(validation) # Update status status = await session.get(ProxyStatusTable, proxy_url) if status: status.last_check_at = now status.total_checks += 1 status.updated_at = now if is_valid: status.last_success_at = now status.total_successes += 1 status.consecutive_successes += 1 status.consecutive_failures = 0 status.health_status = "healthy" # Update avg response time using EMA if response_time_ms is not None: if status.avg_response_time_ms is not None: # EMA with alpha=0.2 status.avg_response_time_ms = ( 0.2 * response_time_ms + 0.8 * status.avg_response_time_ms ) else: status.avg_response_time_ms = response_time_ms else: status.last_failure_at = now status.consecutive_failures += 1 status.consecutive_successes = 0 if status.consecutive_failures >= 3: status.health_status = "dead" elif status.consecutive_failures >= 1: status.health_status = "unhealthy" session.add(status) await session.commit()
[docs] async def record_validations_batch( self, results: list[tuple[str, bool, float | None, str | None]], ) -> int: """Record multiple validation results efficiently. Args: results: List of (proxy_url, is_valid, response_time_ms, error_type) tuples Returns: Number of validations recorded """ now = datetime.now(timezone.utc) async with AsyncSession(self.engine) as session: for proxy_url, is_valid, response_time_ms, error_type in results: # Insert validation record validation = ValidationResultTable( proxy_url=proxy_url, validated_at=now, is_valid=is_valid, response_time_ms=response_time_ms, error_type=error_type, ) session.add(validation) # Update status via raw SQL for efficiency if is_valid: await session.exec( text(""" UPDATE proxy_statuses SET last_check_at = :now, last_success_at = :now, total_checks = total_checks + 1, total_successes = total_successes + 1, consecutive_successes = consecutive_successes + 1, consecutive_failures = 0, health_status = 'healthy', avg_response_time_ms = CASE WHEN avg_response_time_ms IS NULL THEN :response_time ELSE 0.2 * :response_time + 0.8 * avg_response_time_ms END, updated_at = :now WHERE proxy_url = :url """).bindparams(now=now, response_time=response_time_ms, url=proxy_url) ) else: await session.exec( text(""" UPDATE proxy_statuses SET last_check_at = :now, last_failure_at = :now, total_checks = total_checks + 1, consecutive_failures = consecutive_failures + 1, consecutive_successes = 0, health_status = CASE WHEN consecutive_failures >= 2 THEN 'dead' ELSE 'unhealthy' END, updated_at = :now WHERE proxy_url = :url """).bindparams(now=now, url=proxy_url) ) await session.commit() return len(results)
[docs] async def get_healthy_proxies( self, max_age_hours: int = 48, protocol: str | None = None, country_code: str | None = None, limit: int | None = None, ) -> list[dict[str, Any]]: """Get healthy, recently validated proxies. Args: max_age_hours: Maximum age of last successful validation protocol: Filter by protocol (http, https, socks4, socks5) country_code: Filter by country code limit: Maximum number of proxies to return Returns: List of proxy dictionaries with identity and status fields """ cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) stmt = ( select(ProxyIdentityTable, ProxyStatusTable) .join(ProxyStatusTable, ProxyIdentityTable.url == ProxyStatusTable.proxy_url) .where(ProxyStatusTable.health_status == "healthy") .where(ProxyStatusTable.last_success_at >= cutoff) # type: ignore[operator] ) if protocol: stmt = stmt.where(ProxyIdentityTable.protocol == protocol) if country_code: stmt = stmt.where(ProxyIdentityTable.country_code == country_code) stmt = stmt.order_by(ProxyStatusTable.avg_response_time_ms.asc().nullslast()) if limit: stmt = stmt.limit(limit) async with AsyncSession(self.engine) as session: result = await session.exec(stmt) # type: ignore[arg-type] rows = result.all() proxies = [] for identity, status in rows: proxies.append( { "url": identity.url, "protocol": identity.protocol, "host": identity.host, "port": identity.port, "username": self._decrypt_credential(identity.username), "password": self._decrypt_credential(identity.password), "country_code": identity.country_code, "source": identity.source, "health_status": status.health_status, "last_success_at": status.last_success_at, "avg_response_time_ms": status.avg_response_time_ms, "total_checks": status.total_checks, "total_successes": status.total_successes, } ) return proxies
[docs] async def get_proxies_by_status( self, health_status: str, ) -> list[dict[str, Any]]: """Get proxies by health status. Args: health_status: Filter by health status (healthy, unhealthy, dead, unknown) Returns: List of proxy dictionaries """ stmt = ( select(ProxyIdentityTable, ProxyStatusTable) .join(ProxyStatusTable, ProxyIdentityTable.url == ProxyStatusTable.proxy_url) .where(ProxyStatusTable.health_status == health_status) ) async with AsyncSession(self.engine) as session: result = await session.exec(stmt) # type: ignore[arg-type] rows = result.all() proxies = [] for identity, status in rows: proxies.append( { "url": identity.url, "protocol": identity.protocol, "host": identity.host, "port": identity.port, "username": self._decrypt_credential(identity.username), "password": self._decrypt_credential(identity.password), "health_status": status.health_status, "last_success_at": status.last_success_at, "total_checks": status.total_checks, } ) return proxies
[docs] async def load(self) -> list[dict[str, Any]]: """Load all proxies from the database. Returns: List of proxy dictionaries with identity and status fields """ stmt = select(ProxyIdentityTable, ProxyStatusTable).join( ProxyStatusTable, ProxyIdentityTable.url == ProxyStatusTable.proxy_url ) async with AsyncSession(self.engine) as session: result = await session.exec(stmt) # type: ignore[arg-type] rows = result.all() proxies = [] for identity, status in rows: proxies.append( { "url": identity.url, "protocol": identity.protocol, "host": identity.host, "port": identity.port, "username": self._decrypt_credential(identity.username), "password": self._decrypt_credential(identity.password), "country_code": identity.country_code, "source": identity.source, "discovered_at": identity.discovered_at, "health_status": status.health_status, "last_success_at": status.last_success_at, "last_failure_at": status.last_failure_at, "avg_response_time_ms": status.avg_response_time_ms, "total_checks": status.total_checks, "total_successes": status.total_successes, } ) return proxies
[docs] async def load_validated(self, max_age_hours: int = 48) -> list[dict[str, Any]]: """Load proxies validated within the given time window, excluding dead proxies. Args: max_age_hours: Maximum age in hours for last_success_at. Proxies older than this will be excluded. Default: 48 hours. Returns: List of proxy dictionaries with recent validations """ cutoff = datetime.now(timezone.utc) - timedelta(hours=max_age_hours) stmt = ( select(ProxyIdentityTable, ProxyStatusTable) .join(ProxyStatusTable, ProxyIdentityTable.url == ProxyStatusTable.proxy_url) .where(ProxyStatusTable.last_success_at.isnot(None)) # type: ignore[union-attr] .where(ProxyStatusTable.last_success_at >= cutoff) # type: ignore[operator] .where(ProxyStatusTable.health_status == "healthy") ) async with AsyncSession(self.engine) as session: result = await session.exec(stmt) # type: ignore[arg-type] rows = result.all() proxies = [] for identity, status in rows: proxies.append( { "url": identity.url, "protocol": identity.protocol, "host": identity.host, "port": identity.port, "username": self._decrypt_credential(identity.username), "password": self._decrypt_credential(identity.password), "country_code": identity.country_code, "source": identity.source, "discovered_at": identity.discovered_at, "health_status": status.health_status, "last_success_at": status.last_success_at, "last_failure_at": status.last_failure_at, "avg_response_time_ms": status.avg_response_time_ms, "total_checks": status.total_checks, "total_successes": status.total_successes, } ) return proxies
[docs] async def cleanup( self, remove_dead: bool = True, remove_stale_days: int = 7, remove_never_validated: bool = True, vacuum: bool = True, ) -> dict[str, int]: """Clean up stale and dead proxies. Args: remove_dead: Remove proxies with health_status='dead' remove_stale_days: Remove proxies not validated in N days (0 to skip) remove_never_validated: Remove proxies that have never been validated vacuum: Run VACUUM after cleanup to reclaim space Returns: dict[str, int]: Counts of removed items by category (dead, stale, never_validated). """ counts: dict[str, int] = {} async with AsyncSession(self.engine) as session: # Remove dead proxies if remove_dead: # Get URLs of dead proxies dead_stmt = select(ProxyStatusTable.proxy_url).where( ProxyStatusTable.health_status == "dead" ) dead_result = await session.exec(dead_stmt) dead_urls = list(dead_result.all()) if dead_urls: # Delete from proxy_identities del_stmt = delete(ProxyIdentityTable).where( ProxyIdentityTable.url.in_(dead_urls) # type: ignore[attr-defined] ) await session.exec(del_stmt) # type: ignore[arg-type] counts["dead"] = len(dead_urls) # Delete from status table del_status = delete(ProxyStatusTable).where( ProxyStatusTable.proxy_url.in_(dead_urls) # type: ignore[attr-defined] ) await session.exec(del_status) # type: ignore[arg-type] else: counts["dead"] = 0 # Remove stale proxies (not validated recently) if remove_stale_days > 0: cutoff = datetime.now(timezone.utc) - timedelta(days=remove_stale_days) stale_stmt = select(ProxyStatusTable.proxy_url).where( (ProxyStatusTable.last_check_at < cutoff) # type: ignore[operator] | ProxyStatusTable.last_check_at.is_(None) # type: ignore[union-attr] ) stale_result = await session.exec(stale_stmt) stale_urls = list(stale_result.all()) if stale_urls: del_stmt = delete(ProxyIdentityTable).where( ProxyIdentityTable.url.in_(stale_urls) # type: ignore[attr-defined] ) await session.exec(del_stmt) # type: ignore[arg-type] del_status = delete(ProxyStatusTable).where( ProxyStatusTable.proxy_url.in_(stale_urls) # type: ignore[attr-defined] ) await session.exec(del_status) # type: ignore[arg-type] counts["stale"] = len(stale_urls) else: counts["stale"] = 0 # Remove never validated proxies if remove_never_validated: never_stmt = select(ProxyStatusTable.proxy_url).where( ProxyStatusTable.total_checks == 0 ) never_result = await session.exec(never_stmt) never_urls = list(never_result.all()) if never_urls: del_stmt = delete(ProxyIdentityTable).where( ProxyIdentityTable.url.in_(never_urls) # type: ignore[attr-defined] ) await session.exec(del_stmt) # type: ignore[arg-type] del_status = delete(ProxyStatusTable).where( ProxyStatusTable.proxy_url.in_(never_urls) # type: ignore[attr-defined] ) await session.exec(del_status) # type: ignore[arg-type] counts["never_validated"] = len(never_urls) else: counts["never_validated"] = 0 # Clean old validation history (keep 1 day) history_cutoff = datetime.now(timezone.utc) - timedelta(days=1) del_history = delete(ValidationResultTable).where( ValidationResultTable.validated_at < history_cutoff ) result = await session.exec(del_history) # type: ignore[arg-type] counts["old_validations"] = result.rowcount if hasattr(result, "rowcount") else 0 await session.commit() # Vacuum to reclaim space if vacuum: async with self.engine.begin() as conn: await conn.execute(text("VACUUM")) return counts
[docs] async def get_stats(self) -> dict[str, Any]: """Get database statistics. Returns: dict[str, Any]: Comprehensive database statistics. """ stats: dict[str, Any] = {} async with AsyncSession(self.engine) as session: # Total proxies result = await session.exec(text("SELECT COUNT(*) FROM proxy_identities")) stats["total_proxies"] = result.scalar() or 0 # By health status result = await session.exec( text(""" SELECT health_status, COUNT(*) FROM proxy_statuses GROUP BY health_status """) ) stats["by_health"] = dict(result.all()) # By protocol result = await session.exec( text(""" SELECT protocol, COUNT(*) FROM proxy_identities GROUP BY protocol """) ) stats["by_protocol"] = dict(result.all()) # Validation stats (last 24 hours) result = await session.exec( text(""" SELECT COUNT(*) as total_validations, SUM(CASE WHEN is_valid THEN 1 ELSE 0 END) as successful, AVG(CASE WHEN is_valid THEN response_time_ms END) as avg_response_time FROM validation_results WHERE validated_at > datetime('now', '-24 hours') """) ) row = result.one() stats["validations_24h"] = { "total": row[0] or 0, "successful": row[1] or 0, "avg_response_time_ms": row[2], } # Database file size if self.filepath.exists(): stats["db_size_bytes"] = self.filepath.stat().st_size return stats