proxywhirl.storage

Storage backends for persisting proxy pools.

Classes

FileStorage

File-based storage backend using JSON.

ProxyIdentityTable

Immutable proxy identity table (normalized schema).

ProxyStatusTable

Current computed status table (normalized schema).

SQLiteStorage

SQLite-based storage backend with normalized 3-table schema.

ValidationResultTable

Individual validation result table (append-only).

Module Contents

class proxywhirl.storage.FileStorage(filepath, encryption_key=None)[source]

File-based storage backend using JSON.

Stores proxies in a JSON file with atomic writes to prevent corruption. Supports optional encryption for sensitive credential data.

Initialize file storage.

Parameters:
  • filepath (str | pathlib.Path) – Path to the JSON file for storage

  • encryption_key (bytes | None) – Optional Fernet encryption key for encrypting credentials. If provided, all data will be encrypted at rest.

async clear()[source]

Clear all proxies from storage by deleting the file.

Raises:

IOError – If clear operation fails

Return type:

None

async load()[source]

Load proxies from JSON file.

Returns:

List of proxies loaded from file

Raises:
  • FileNotFoundError – If file doesn’t exist

  • ValueError – If JSON is invalid or data is corrupted

  • cryptography.fernet.InvalidToken – If decryption fails (wrong key)

Return type:

list[proxywhirl.models.Proxy]

async save(proxies)[source]

Save proxies to JSON file.

Parameters:

proxies (list[proxywhirl.models.Proxy]) – List of proxies to save

Raises:

IOError – If save operation fails

Return type:

None

class proxywhirl.storage.ProxyIdentityTable(**data)[source]

Bases: sqlmodel.SQLModel

Immutable proxy identity table (normalized schema).

This table stores the core identity of each proxy, with fields that rarely change. Geographic and source metadata are stored here.

Primary Key:

url: Full proxy URL (e.g., “http://1.2.3.4:8080”)

Indexes:
  • protocol: Fast filtering by protocol type

  • host_port: Unique constraint to prevent duplicates

  • country_code: Geographic filtering

  • source: Source-based queries

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

class proxywhirl.storage.ProxyStatusTable(**data)[source]

Bases: sqlmodel.SQLModel

Current computed status table (normalized schema).

This table maintains the current state of each proxy, computed from validation results. It’s updated after each validation.

Primary Key:

proxy_url: References proxy_identities.url

Indexes:
  • health_status: Filter by current health

  • last_success_at: Find recently working proxies

  • success_rate_7d: Performance-based sorting

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)

class proxywhirl.storage.SQLiteStorage(filepath, use_async_driver=True, pool_size=5, pool_max_overflow=10, pool_timeout=30.0, pool_recycle=3600)[source]

SQLite-based storage backend with normalized 3-table schema.

Uses a normalized schema for efficient storage and querying:
  • proxy_identities: Immutable proxy identity (URL, host, port, geo)

  • proxy_statuses: Current computed status (health, response times)

  • validation_results: Append-only validation history

Features:
  • Async operations using aiosqlite for non-blocking I/O

  • Normalized schema prevents SQL variable overflow

  • Advanced filtering by health_status, protocol, country

  • Automatic schema creation

  • EMA-based response time tracking

  • Health status state machine (unknown → healthy/unhealthy/dead)

Example:

storage = SQLiteStorage("proxies.db")
await storage.initialize()

# Add proxies
await storage.add_proxy(proxy)
added, skipped = await storage.add_proxies_batch(proxies)

# Record validation results
await storage.record_validation(proxy_url, is_valid=True, response_time_ms=150)

# Query proxies
healthy = await storage.get_healthy_proxies(max_age_hours=48)
dead = await storage.get_proxies_by_status("dead")

# Cleanup and stats
counts = await storage.cleanup(remove_dead=True, remove_stale_days=7)
stats = await storage.get_stats()

await storage.close()

Initialize SQLite storage with connection pooling.

Parameters:
  • filepath (str | pathlib.Path) – Path to the SQLite database file. Will be created if it doesn’t exist. Parent directories will be created automatically.

  • use_async_driver (bool) – Whether to use async aiosqlite driver (True, recommended) for true non-blocking async I/O. When False, uses a compatibility mode that may fall back to sync operations. Default is True for best async performance. Note: Current implementation requires aiosqlite; this option is reserved for future compatibility enhancements.

  • pool_size (int) – Size of the connection pool (max concurrent connections). Default: 5

  • pool_max_overflow (int) – Max overflow connections beyond pool_size. Default: 10

  • pool_timeout (float) – Timeout in seconds when getting connection from pool. Default: 30.0

  • pool_recycle (int) – Recycle connections after N seconds (-1 to disable). Default: 3600

async add_proxies_batch(proxies, validated=False)[source]

Add multiple proxies to the normalized schema.

Parameters:
  • proxies (list[proxywhirl.models.Proxy]) – List of Proxy models to add

  • validated (bool) – If True, mark proxies as already validated (healthy status, last_success_at set). Use this when proxies have passed validation before being saved.

Returns:

Tuple of (added_count, skipped_count)

Return type:

tuple[int, int]

async add_proxy(proxy)[source]

Add a new proxy if not exists.

Parameters:

proxy (proxywhirl.models.Proxy) – Proxy model to add

Returns:

True if added, False if already exists

Return type:

bool

async cleanup(remove_dead=True, remove_stale_days=7, remove_never_validated=True, vacuum=True)[source]

Clean up stale and dead proxies.

Parameters:
  • remove_dead (bool) – Remove proxies with health_status=’dead’

  • remove_stale_days (int) – Remove proxies not validated in N days (0 to skip)

  • remove_never_validated (bool) – Remove proxies that have never been validated

  • vacuum (bool) – Run VACUUM after cleanup to reclaim space

Returns:

Counts of removed items by category (dead, stale, never_validated).

Return type:

dict[str, int]

async clear()[source]

Clear all proxies from database.

Return type:

None

async close()[source]

Close database connection and release resources.

Should be called when done with the storage to properly cleanup database connections. Safe to call multiple times.

Return type:

None

async delete(proxy_url)[source]

Delete a proxy by URL.

Parameters:

proxy_url (str) – URL of the proxy to delete

Returns:

True if deleted, False if not found

Return type:

bool

async get_healthy_proxies(max_age_hours=48, protocol=None, country_code=None, limit=None)[source]

Get healthy, recently validated proxies.

Parameters:
  • max_age_hours (int) – Maximum age of last successful validation

  • protocol (str | None) – Filter by protocol (http, https, socks4, socks5)

  • country_code (str | None) – Filter by country code

  • limit (int | None) – Maximum number of proxies to return

Returns:

List of proxy dictionaries with identity and status fields

Return type:

list[dict[str, Any]]

async get_proxies_by_status(health_status)[source]

Get proxies by health status.

Parameters:

health_status (str) – Filter by health status (healthy, unhealthy, dead, unknown)

Returns:

List of proxy dictionaries

Return type:

list[dict[str, Any]]

async get_stats()[source]

Get database statistics.

Returns:

Comprehensive database statistics.

Return type:

dict[str, Any]

async initialize()[source]

Create database tables if they don’t exist.

Should be called once before any other operations. Creates the ‘proxies’ table with all necessary columns and indexes. Safe to call multiple times - existing tables won’t be affected.

Raises:

Exception – If database initialization fails

Return type:

None

async load()[source]

Load all proxies from the database.

Returns:

List of proxy dictionaries with identity and status fields

Return type:

list[dict[str, Any]]

async load_validated(max_age_hours=48)[source]

Load proxies validated within the given time window, excluding dead proxies.

Parameters:

max_age_hours (int) – Maximum age in hours for last_success_at. Proxies older than this will be excluded. Default: 48 hours.

Returns:

List of proxy dictionaries with recent validations

Return type:

list[dict[str, Any]]

async query(**filters)[source]

Query proxies with filtering.

Parameters:

**filters (str) – Filter criteria (source, health_status)

Returns:

List of proxy dictionaries matching criteria

Return type:

list[dict[str, Any]]

async record_validation(proxy_url, is_valid, response_time_ms=None, error_type=None, error_message=None)[source]

Record a validation result and update proxy status.

Parameters:
  • proxy_url (str) – URL of the proxy that was validated

  • is_valid (bool) – Whether the validation succeeded

  • response_time_ms (float | None) – Response time in milliseconds (if successful)

  • error_type (str | None) – Type of error (if failed)

  • error_message (str | None) – Error message (if failed)

Return type:

None

async record_validations_batch(results)[source]

Record multiple validation results efficiently.

Parameters:

results (list[tuple[str, bool, float | None, str | None]]) – List of (proxy_url, is_valid, response_time_ms, error_type) tuples

Returns:

Number of validations recorded

Return type:

int

async save(proxies, validated=False)[source]

Save proxies to database (adds new, skips existing).

This is a compatibility wrapper around add_proxies_batch().

Parameters:
  • proxies (list[proxywhirl.models.Proxy]) – List of proxies to save. Empty list is allowed (no-op).

  • validated (bool) – If True, mark proxies as already validated (healthy status).

Return type:

None

class proxywhirl.storage.ValidationResultTable(**data)[source]

Bases: sqlmodel.SQLModel

Individual validation result table (append-only).

Stores each validation attempt as an immutable record. This enables historical analysis and trend tracking.

Indexes:
  • proxy_url + validated_at: Fast lookup of recent validations

  • validated_at: Time-range queries

  • is_valid + validated_at: Finding recent valid proxies

Create a new model by parsing and validating input data from keyword arguments.

Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.

self is explicitly positional-only to allow self as a field name.

Parameters:

data (Any)