proxywhirl.storage¶
Storage backends for persisting proxy pools.
Classes¶
File-based storage backend using JSON. |
|
Immutable proxy identity table (normalized schema). |
|
Current computed status table (normalized schema). |
|
SQLite-based storage backend with normalized 3-table schema. |
|
Individual validation result table (append-only). |
Module Contents¶
- class proxywhirl.storage.FileStorage(filepath, encryption_key=None)[source]¶
File-based storage backend using JSON.
Stores proxies in a JSON file with atomic writes to prevent corruption. Supports optional encryption for sensitive credential data.
Initialize file storage.
- Parameters:
filepath (str | pathlib.Path) – Path to the JSON file for storage
encryption_key (bytes | None) – Optional Fernet encryption key for encrypting credentials. If provided, all data will be encrypted at rest.
- async clear()[source]¶
Clear all proxies from storage by deleting the file.
- Raises:
IOError – If clear operation fails
- Return type:
None
- async load()[source]¶
Load proxies from JSON file.
- Returns:
List of proxies loaded from file
- Raises:
FileNotFoundError – If file doesn’t exist
ValueError – If JSON is invalid or data is corrupted
cryptography.fernet.InvalidToken – If decryption fails (wrong key)
- Return type:
list[proxywhirl.models.Proxy]
- class proxywhirl.storage.ProxyIdentityTable(**data)[source]¶
Bases:
sqlmodel.SQLModelImmutable proxy identity table (normalized schema).
This table stores the core identity of each proxy, with fields that rarely change. Geographic and source metadata are stored here.
- Primary Key:
url: Full proxy URL (e.g., “http://1.2.3.4:8080”)
- Indexes:
protocol: Fast filtering by protocol type
host_port: Unique constraint to prevent duplicates
country_code: Geographic filtering
source: Source-based queries
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
data (Any)
- class proxywhirl.storage.ProxyStatusTable(**data)[source]¶
Bases:
sqlmodel.SQLModelCurrent computed status table (normalized schema).
This table maintains the current state of each proxy, computed from validation results. It’s updated after each validation.
- Primary Key:
proxy_url: References proxy_identities.url
- Indexes:
health_status: Filter by current health
last_success_at: Find recently working proxies
success_rate_7d: Performance-based sorting
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
data (Any)
- class proxywhirl.storage.SQLiteStorage(filepath, use_async_driver=True, pool_size=5, pool_max_overflow=10, pool_timeout=30.0, pool_recycle=3600)[source]¶
SQLite-based storage backend with normalized 3-table schema.
- Uses a normalized schema for efficient storage and querying:
proxy_identities: Immutable proxy identity (URL, host, port, geo)
proxy_statuses: Current computed status (health, response times)
validation_results: Append-only validation history
- Features:
Async operations using aiosqlite for non-blocking I/O
Normalized schema prevents SQL variable overflow
Advanced filtering by health_status, protocol, country
Automatic schema creation
EMA-based response time tracking
Health status state machine (unknown → healthy/unhealthy/dead)
Example:
storage = SQLiteStorage("proxies.db") await storage.initialize() # Add proxies await storage.add_proxy(proxy) added, skipped = await storage.add_proxies_batch(proxies) # Record validation results await storage.record_validation(proxy_url, is_valid=True, response_time_ms=150) # Query proxies healthy = await storage.get_healthy_proxies(max_age_hours=48) dead = await storage.get_proxies_by_status("dead") # Cleanup and stats counts = await storage.cleanup(remove_dead=True, remove_stale_days=7) stats = await storage.get_stats() await storage.close()
Initialize SQLite storage with connection pooling.
- Parameters:
filepath (str | pathlib.Path) – Path to the SQLite database file. Will be created if it doesn’t exist. Parent directories will be created automatically.
use_async_driver (bool) – Whether to use async aiosqlite driver (True, recommended) for true non-blocking async I/O. When False, uses a compatibility mode that may fall back to sync operations. Default is True for best async performance. Note: Current implementation requires aiosqlite; this option is reserved for future compatibility enhancements.
pool_size (int) – Size of the connection pool (max concurrent connections). Default: 5
pool_max_overflow (int) – Max overflow connections beyond pool_size. Default: 10
pool_timeout (float) – Timeout in seconds when getting connection from pool. Default: 30.0
pool_recycle (int) – Recycle connections after N seconds (-1 to disable). Default: 3600
- async add_proxies_batch(proxies, validated=False)[source]¶
Add multiple proxies to the normalized schema.
- Parameters:
- Returns:
Tuple of (added_count, skipped_count)
- Return type:
- async add_proxy(proxy)[source]¶
Add a new proxy if not exists.
- Parameters:
proxy (proxywhirl.models.Proxy) – Proxy model to add
- Returns:
True if added, False if already exists
- Return type:
- async cleanup(remove_dead=True, remove_stale_days=7, remove_never_validated=True, vacuum=True)[source]¶
Clean up stale and dead proxies.
- Parameters:
- Returns:
Counts of removed items by category (dead, stale, never_validated).
- Return type:
- async close()[source]¶
Close database connection and release resources.
Should be called when done with the storage to properly cleanup database connections. Safe to call multiple times.
- Return type:
None
- async get_healthy_proxies(max_age_hours=48, protocol=None, country_code=None, limit=None)[source]¶
Get healthy, recently validated proxies.
- Parameters:
- Returns:
List of proxy dictionaries with identity and status fields
- Return type:
- async initialize()[source]¶
Create database tables if they don’t exist.
Should be called once before any other operations. Creates the ‘proxies’ table with all necessary columns and indexes. Safe to call multiple times - existing tables won’t be affected.
- Raises:
Exception – If database initialization fails
- Return type:
None
- async load_validated(max_age_hours=48)[source]¶
Load proxies validated within the given time window, excluding dead proxies.
- async record_validation(proxy_url, is_valid, response_time_ms=None, error_type=None, error_message=None)[source]¶
Record a validation result and update proxy status.
- Parameters:
- Return type:
None
- class proxywhirl.storage.ValidationResultTable(**data)[source]¶
Bases:
sqlmodel.SQLModelIndividual validation result table (append-only).
Stores each validation attempt as an immutable record. This enables historical analysis and trend tracking.
- Indexes:
proxy_url + validated_at: Fast lookup of recent validations
validated_at: Time-range queries
is_valid + validated_at: Finding recent valid proxies
Create a new model by parsing and validating input data from keyword arguments.
Raises [ValidationError][pydantic_core.ValidationError] if the input data cannot be validated to form a valid model.
self is explicitly positional-only to allow self as a field name.
- Parameters:
data (Any)