proxywhirl.storage ================== .. py:module:: proxywhirl.storage .. autoapi-nested-parse:: Storage backends for persisting proxy pools. Classes ------- .. autoapisummary:: proxywhirl.storage.FileStorage proxywhirl.storage.ProxyIdentityTable proxywhirl.storage.ProxyStatusTable proxywhirl.storage.SQLiteStorage proxywhirl.storage.ValidationResultTable Module Contents --------------- .. py:class:: FileStorage(filepath, encryption_key = None) File-based storage backend using JSON. Stores proxies in a JSON file with atomic writes to prevent corruption. Supports optional encryption for sensitive credential data. Initialize file storage. :param filepath: Path to the JSON file for storage :param encryption_key: Optional Fernet encryption key for encrypting credentials. If provided, all data will be encrypted at rest. .. py:method:: clear() :async: Clear all proxies from storage by deleting the file. :raises IOError: If clear operation fails .. py:method:: load() :async: Load proxies from JSON file. :returns: List of proxies loaded from file :raises FileNotFoundError: If file doesn't exist :raises ValueError: If JSON is invalid or data is corrupted :raises cryptography.fernet.InvalidToken: If decryption fails (wrong key) .. py:method:: save(proxies) :async: Save proxies to JSON file. :param proxies: List of proxies to save :raises IOError: If save operation fails .. py:class:: ProxyIdentityTable(**data) Bases: :py:obj:`sqlmodel.SQLModel` Immutable proxy identity table (normalized schema). This table stores the core identity of each proxy, with fields that rarely change. Geographic and source metadata are stored here. Primary Key: url: Full proxy URL (e.g., "http://1.2.3.4:8080") Indexes: - protocol: Fast filtering by protocol type - host_port: Unique constraint to prevent duplicates - country_code: Geographic filtering - source: Source-based queries Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:class:: ProxyStatusTable(**data) Bases: :py:obj:`sqlmodel.SQLModel` Current computed status table (normalized schema). This table maintains the current state of each proxy, computed from validation results. It's updated after each validation. Primary Key: proxy_url: References proxy_identities.url Indexes: - health_status: Filter by current health - last_success_at: Find recently working proxies - success_rate_7d: Performance-based sorting Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name. .. py:class:: SQLiteStorage(filepath, use_async_driver = True, pool_size = 5, pool_max_overflow = 10, pool_timeout = 30.0, pool_recycle = 3600) SQLite-based storage backend with normalized 3-table schema. Uses a normalized schema for efficient storage and querying: - proxy_identities: Immutable proxy identity (URL, host, port, geo) - proxy_statuses: Current computed status (health, response times) - validation_results: Append-only validation history Features: - Async operations using aiosqlite for non-blocking I/O - Normalized schema prevents SQL variable overflow - Advanced filtering by health_status, protocol, country - Automatic schema creation - EMA-based response time tracking - Health status state machine (unknown → healthy/unhealthy/dead) Example:: storage = SQLiteStorage("proxies.db") await storage.initialize() # Add proxies await storage.add_proxy(proxy) added, skipped = await storage.add_proxies_batch(proxies) # Record validation results await storage.record_validation(proxy_url, is_valid=True, response_time_ms=150) # Query proxies healthy = await storage.get_healthy_proxies(max_age_hours=48) dead = await storage.get_proxies_by_status("dead") # Cleanup and stats counts = await storage.cleanup(remove_dead=True, remove_stale_days=7) stats = await storage.get_stats() await storage.close() Initialize SQLite storage with connection pooling. :param filepath: Path to the SQLite database file. Will be created if it doesn't exist. Parent directories will be created automatically. :param use_async_driver: Whether to use async aiosqlite driver (True, recommended) for true non-blocking async I/O. When False, uses a compatibility mode that may fall back to sync operations. Default is True for best async performance. Note: Current implementation requires aiosqlite; this option is reserved for future compatibility enhancements. :param pool_size: Size of the connection pool (max concurrent connections). Default: 5 :param pool_max_overflow: Max overflow connections beyond pool_size. Default: 10 :param pool_timeout: Timeout in seconds when getting connection from pool. Default: 30.0 :param pool_recycle: Recycle connections after N seconds (-1 to disable). Default: 3600 .. py:method:: add_proxies_batch(proxies, validated = False) :async: Add multiple proxies to the normalized schema. :param proxies: List of Proxy models to add :param validated: If True, mark proxies as already validated (healthy status, last_success_at set). Use this when proxies have passed validation before being saved. :returns: Tuple of (added_count, skipped_count) .. py:method:: add_proxy(proxy) :async: Add a new proxy if not exists. :param proxy: Proxy model to add :returns: True if added, False if already exists .. py:method:: cleanup(remove_dead = True, remove_stale_days = 7, remove_never_validated = True, vacuum = True) :async: Clean up stale and dead proxies. :param remove_dead: Remove proxies with health_status='dead' :param remove_stale_days: Remove proxies not validated in N days (0 to skip) :param remove_never_validated: Remove proxies that have never been validated :param vacuum: Run VACUUM after cleanup to reclaim space :returns: Counts of removed items by category (dead, stale, never_validated). :rtype: dict[str, int] .. py:method:: clear() :async: Clear all proxies from database. .. py:method:: close() :async: Close database connection and release resources. Should be called when done with the storage to properly cleanup database connections. Safe to call multiple times. .. py:method:: delete(proxy_url) :async: Delete a proxy by URL. :param proxy_url: URL of the proxy to delete :returns: True if deleted, False if not found .. py:method:: get_healthy_proxies(max_age_hours = 48, protocol = None, country_code = None, limit = None) :async: Get healthy, recently validated proxies. :param max_age_hours: Maximum age of last successful validation :param protocol: Filter by protocol (http, https, socks4, socks5) :param country_code: Filter by country code :param limit: Maximum number of proxies to return :returns: List of proxy dictionaries with identity and status fields .. py:method:: get_proxies_by_status(health_status) :async: Get proxies by health status. :param health_status: Filter by health status (healthy, unhealthy, dead, unknown) :returns: List of proxy dictionaries .. py:method:: get_stats() :async: Get database statistics. :returns: Comprehensive database statistics. :rtype: dict[str, Any] .. py:method:: initialize() :async: Create database tables if they don't exist. Should be called once before any other operations. Creates the 'proxies' table with all necessary columns and indexes. Safe to call multiple times - existing tables won't be affected. :raises Exception: If database initialization fails .. py:method:: load() :async: Load all proxies from the database. :returns: List of proxy dictionaries with identity and status fields .. py:method:: load_validated(max_age_hours = 48) :async: Load proxies validated within the given time window, excluding dead proxies. :param max_age_hours: Maximum age in hours for last_success_at. Proxies older than this will be excluded. Default: 48 hours. :returns: List of proxy dictionaries with recent validations .. py:method:: query(**filters) :async: Query proxies with filtering. :param \*\*filters: Filter criteria (source, health_status) :returns: List of proxy dictionaries matching criteria .. py:method:: record_validation(proxy_url, is_valid, response_time_ms = None, error_type = None, error_message = None) :async: Record a validation result and update proxy status. :param proxy_url: URL of the proxy that was validated :param is_valid: Whether the validation succeeded :param response_time_ms: Response time in milliseconds (if successful) :param error_type: Type of error (if failed) :param error_message: Error message (if failed) .. py:method:: record_validations_batch(results) :async: Record multiple validation results efficiently. :param results: List of (proxy_url, is_valid, response_time_ms, error_type) tuples :returns: Number of validations recorded .. py:method:: save(proxies, validated = False) :async: Save proxies to database (adds new, skips existing). This is a compatibility wrapper around add_proxies_batch(). :param proxies: List of proxies to save. Empty list is allowed (no-op). :param validated: If True, mark proxies as already validated (healthy status). .. py:class:: ValidationResultTable(**data) Bases: :py:obj:`sqlmodel.SQLModel` Individual validation result table (append-only). Stores each validation attempt as an immutable record. This enables historical analysis and trend tracking. Indexes: - proxy_url + validated_at: Fast lookup of recent validations - validated_at: Time-range queries - is_valid + validated_at: Finding recent valid proxies Create a new model by parsing and validating input data from keyword arguments. Raises [`ValidationError`][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model. `self` is explicitly positional-only to allow `self` as a field name.