Source code for proxywhirl.cache.tiers

"""Cache tier implementations for multi-tier caching strategy.

Defines:
- CacheTier: Abstract base class for cache tier implementations
- MemoryCacheTier: L1 in-memory cache using OrderedDict
- FileCacheTier: L2 JSONL file cache with encryption and file locking
- SQLiteCacheTier: L3 database cache with full persistence
"""

from __future__ import annotations

import hashlib
import json
import sqlite3
from abc import ABC, abstractmethod
from collections import OrderedDict
from datetime import datetime, timezone
from enum import Enum
from pathlib import Path
from typing import Callable

import portalocker
from loguru import logger

from .crypto import CredentialEncryptor
from .models import CacheEntry, CacheTierConfig

__all__ = [
    "TierType",
    "CacheTier",
    "MemoryCacheTier",
    "JsonlCacheTier",
    "DiskCacheTier",
    "SQLiteCacheTier",
]


[docs] class TierType(str, Enum): """Cache tier types.""" L1_MEMORY = "l1_memory" L2_FILE = "l2_file" L3_SQLITE = "l3_sqlite"
[docs] class CacheTier(ABC): """Abstract base class for cache tier implementations. Defines the interface that all cache tiers (L1, L2, L3) must implement, including graceful degradation on repeated failures. Attributes: config: Configuration for this tier tier_type: Type of tier (L1/L2/L3) enabled: Whether tier is operational failure_count: Consecutive failures for degradation tracking failure_threshold: Failures before auto-disabling tier """ def __init__(self, config: CacheTierConfig, tier_type: TierType) -> None: """Initialize cache tier with configuration. Args: config: Configuration for this tier tier_type: Type of tier (L1/L2/L3) """
[docs] self.config = config
[docs] self.tier_type = tier_type
[docs] self.enabled = config.enabled
[docs] self.failure_count = 0
[docs] self.failure_threshold = 3
@abstractmethod
[docs] def get(self, key: str) -> CacheEntry | None: """Retrieve entry by key, None if not found or expired. Args: key: Cache key to lookup Returns: CacheEntry if found and valid, None otherwise """ pass
@abstractmethod
[docs] def put(self, key: str, entry: CacheEntry) -> bool: """Store entry, return True if successful. Args: key: Cache key for entry entry: CacheEntry to store Returns: True if stored successfully, False otherwise """ pass
@abstractmethod
[docs] def delete(self, key: str) -> bool: """Remove entry by key, return True if existed. Args: key: Cache key to delete Returns: True if entry existed and was deleted, False if not found """ pass
@abstractmethod
[docs] def clear(self) -> int: """Clear all entries, return count of removed entries. Returns: Number of entries removed """ pass
@abstractmethod
[docs] def size(self) -> int: """Return current number of entries. Returns: Number of entries in tier """ pass
@abstractmethod
[docs] def keys(self) -> list[str]: """Return list of all keys. Returns: List of cache keys """ pass
@abstractmethod
[docs] def cleanup_expired(self) -> int: """Remove all expired entries in bulk. Returns: Number of entries removed """ pass
def __contains__(self, key: str) -> bool: """Check if key exists in this tier. Args: key: Cache key to check Returns: True if key exists, False otherwise """ return key in self.keys()
[docs] def handle_failure(self, error: Exception) -> None: """Handle tier operation failure for graceful degradation. Increments failure count and disables tier if threshold exceeded. Called by implementations when operations fail. Args: error: Exception that occurred """ self.failure_count += 1 if self.failure_count >= self.failure_threshold: self.enabled = False
# Log degradation (implementations should log specific details)
[docs] def reset_failures(self) -> None: """Reset failure count on successful operation. Re-enables tier if previously disabled and resets failure counter. Implementations should call this after successful operations. """ self.failure_count = 0 if not self.enabled: self.enabled = True
# Log recovery (implementations should log specific details)
[docs] class MemoryCacheTier(CacheTier): """L1 in-memory cache using OrderedDict for LRU tracking. Provides O(1) lookups with automatic LRU eviction when max_entries exceeded. """ def __init__( self, config: CacheTierConfig, tier_type: TierType, on_evict: Callable[[str, CacheEntry], None] | None = None, ) -> None: """Initialize memory cache with LRU tracking. Args: config: Tier configuration tier_type: Type of tier (L1/L2/L3) on_evict: Optional callback when entry is evicted (key, entry) """ super().__init__(config, tier_type) self._cache: OrderedDict[str, CacheEntry] = OrderedDict() self._on_evict = on_evict
[docs] def get(self, key: str) -> CacheEntry | None: """Retrieve entry from memory cache, updating LRU order. Args: key: Cache key to lookup. Returns: CacheEntry if found, None otherwise. Updates LRU order on hit by moving the accessed entry to the end of the OrderedDict. Side Effects: Moves accessed entry to end of LRU queue (most recently used position). """ if key in self._cache: # Move to end (most recently used) self._cache.move_to_end(key) return self._cache[key] return None
[docs] def put(self, key: str, entry: CacheEntry) -> bool: """Store entry in memory cache with automatic LRU eviction. Args: key: Cache key for the entry. entry: CacheEntry object to store. Returns: True if stored successfully, False on error. Side Effects: - Removes existing entry if key already exists (update operation). - Evicts least recently used entry if max_entries exceeded. - Calls on_evict callback if provided when eviction occurs. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns False. """ try: # Update existing or add new if key in self._cache: del self._cache[key] self._cache[key] = entry # Evict LRU if over capacity if self.config.max_entries and len(self._cache) > self.config.max_entries: evicted_key, evicted_entry = self._cache.popitem( last=False ) # Remove oldest (FIFO end) # Notify callback of eviction if self._on_evict: self._on_evict(evicted_key, evicted_entry) self.reset_failures() return True except Exception as e: self.handle_failure(e) return False
[docs] def delete(self, key: str) -> bool: """Remove entry from memory cache by key. Args: key: Cache key to delete. Returns: True if entry existed and was deleted, False if not found. Side Effects: Removes entry from OrderedDict if present. """ if key in self._cache: del self._cache[key] return True return False
[docs] def clear(self) -> int: """Clear all entries from memory cache. Returns: Number of entries that were removed. Side Effects: Empties the OrderedDict, releasing all cached entries. """ count = len(self._cache) self._cache.clear() return count
[docs] def size(self) -> int: """Return current number of entries in memory cache. Returns: Count of entries currently stored in the OrderedDict. """ return len(self._cache)
[docs] def keys(self) -> list[str]: """Return list of all cache keys in memory tier. Returns: List of cache keys in LRU order (oldest to newest). """ return list(self._cache.keys())
[docs] def cleanup_expired(self) -> int: """Remove all expired entries from memory cache in bulk. Returns: Number of expired entries that were removed. Side Effects: Deletes all entries where is_expired property is True. """ removed = 0 expired_keys = [key for key, entry in self._cache.items() if entry.is_expired] for key in expired_keys: del self._cache[key] removed += 1 return removed
[docs] class JsonlCacheTier(CacheTier): """L2 JSONL file-based cache with sharding and encryption. Provides persistent caching using JSONL (JSON Lines) files with: - Sharded storage for better I/O performance - File locking for concurrent access safety - Encrypted credentials at rest - Simple text format for debugging and portability Best suited for: - Smaller cache sizes (<10K entries) - Environments where SQLite is unavailable - Cases requiring human-readable cache files - Simple deployment without database dependencies For larger caches (>10K entries), consider DiskCacheTier (SQLite-based) which provides O(log n) lookups vs O(n) for JSONL. """ def __init__( self, config: CacheTierConfig, tier_type: TierType, cache_dir: Path, encryptor: CredentialEncryptor | None = None, num_shards: int = 16, ) -> None: """Initialize JSONL-based L2 cache. Args: config: Tier configuration tier_type: Type of tier (should be L2_FILE) cache_dir: Directory for JSONL shard files encryptor: Credential encryptor for username/password num_shards: Number of shard files (default: 16) """ super().__init__(config, tier_type) self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) self.encryptor = encryptor or CredentialEncryptor() self.num_shards = num_shards # In-memory index for fast lookups (key -> shard_id) self._index: dict[str, int] = {} # OrderedDict for O(1) LRU eviction tracking (key -> last_accessed timestamp) self._access_order: OrderedDict[str, float] = OrderedDict() self._rebuild_index() def _get_shard_path(self, shard_id: int) -> Path: """Get path to a specific shard file.""" return self.cache_dir / f"shard_{shard_id:02d}.jsonl" def _get_shard_id(self, key: str) -> int: """Compute shard ID for a cache key using deterministic hashing. Uses MD5 (not for security, just for deterministic distribution) instead of Python's hash() which is randomized per process. """ return ( int(hashlib.md5(key.encode(), usedforsecurity=False).hexdigest(), 16) % self.num_shards ) def _evict_oldest(self) -> bool: """Evict oldest entry based on last_accessed time for LRU behavior. Uses OrderedDict for O(1) eviction instead of O(n*m) shard scanning. Returns: True if an entry was evicted, False otherwise """ if not self._access_order: return False # Pop oldest entry (first item in OrderedDict) oldest_key, _ = self._access_order.popitem(last=False) # Delete from storage if self.delete(oldest_key): logger.debug(f"Evicted oldest entry: {oldest_key}") return True return False def _rebuild_index(self) -> None: """Rebuild in-memory index and access order from all shard files.""" self._index.clear() self._access_order.clear() # Collect all entries with their last_accessed times entries_with_times: list[tuple[str, int, float]] = [] for shard_id in range(self.num_shards): shard_path = self._get_shard_path(shard_id) if not shard_path.exists(): continue try: with portalocker.Lock(shard_path, "r", timeout=5) as f: for line in f: if not line.strip(): continue try: data = json.loads(line) if "key" in data: key = data["key"] self._index[key] = shard_id # Extract last_accessed time for ordering if "last_accessed" in data: try: last_accessed = datetime.fromisoformat( data["last_accessed"] ) entries_with_times.append( (key, shard_id, last_accessed.timestamp()) ) except (ValueError, AttributeError): # Default to current time if parse fails entries_with_times.append( (key, shard_id, datetime.now(timezone.utc).timestamp()) ) else: # No timestamp, use current time entries_with_times.append( (key, shard_id, datetime.now(timezone.utc).timestamp()) ) except json.JSONDecodeError as e: logger.debug(f"Skipping corrupted line during index rebuild: {e}") continue except (OSError, portalocker.LockException) as e: logger.warning(f"Failed to read shard {shard_id} during index rebuild: {e}") continue # Sort by timestamp (oldest first) and populate OrderedDict entries_with_times.sort(key=lambda x: x[2]) for key, _, timestamp in entries_with_times: self._access_order[key] = timestamp def _read_shard(self, shard_id: int) -> dict[str, dict]: """Read all entries from a shard file. Returns: dict[str, dict]: Mapping of key to entry data dict. """ shard_path = self._get_shard_path(shard_id) entries: dict[str, dict] = {} if not shard_path.exists(): return entries try: with portalocker.Lock(shard_path, "r", timeout=5) as f: for line in f: if not line.strip(): continue try: data = json.loads(line) if "key" in data: entries[data["key"]] = data except json.JSONDecodeError as e: logger.debug(f"Skipping corrupted line in shard {shard_id}: {e}") continue except (OSError, portalocker.LockException) as e: logger.warning(f"Failed to read shard {shard_id}: {e}") return entries def _write_shard(self, shard_id: int, entries: dict[str, dict]) -> bool: """Write all entries to a shard file atomically. Args: shard_id: Shard file ID entries: Mapping of key to entry data dict. Returns: True if write succeeded """ shard_path = self._get_shard_path(shard_id) try: # Write to temp file first, then rename for atomicity temp_path = shard_path.with_suffix(".tmp") with portalocker.Lock(temp_path, "w", timeout=5) as f: for data in entries.values(): json.dump(data, f) f.write("\n") # Atomic rename temp_path.replace(shard_path) return True except (OSError, portalocker.LockException) as e: logger.error(f"Failed to write shard {shard_id}: {e}") return False
[docs] def get(self, key: str) -> CacheEntry | None: """Retrieve entry from JSONL shard. Args: key: Cache key to lookup Returns: CacheEntry if found and valid, None otherwise """ try: shard_id = self._index.get(key) if shard_id is None: return None entries = self._read_shard(shard_id) data = entries.get(key) if not data: # Key was in index but not in file - remove from both index and access order self._index.pop(key, None) self._access_order.pop(key, None) return None # Decrypt credentials if data.get("username_encrypted"): username_bytes = bytes.fromhex(data["username_encrypted"]) data["username"] = self.encryptor.decrypt(username_bytes) del data["username_encrypted"] if data.get("password_encrypted"): password_bytes = bytes.fromhex(data["password_encrypted"]) data["password"] = self.encryptor.decrypt(password_bytes) del data["password_encrypted"] # Convert timestamps (including optional health monitoring timestamps) for field in [ "fetch_time", "last_accessed", "expires_at", "last_health_check", "next_check_time", ]: if field in data and data[field] is not None and isinstance(data[field], str): data[field] = datetime.fromisoformat(data[field]) # Ensure health monitoring fields have defaults for backward compatibility data.setdefault("consecutive_health_failures", 0) data.setdefault("consecutive_health_successes", 0) data.setdefault("recovery_attempt", 0) data.setdefault("total_health_checks", 0) data.setdefault("total_health_check_failures", 0) # Update access order (move to end for LRU) if key in self._access_order: self._access_order.move_to_end(key) self._access_order[key] = datetime.now(timezone.utc).timestamp() self.reset_failures() return CacheEntry(**data) except Exception as e: self.handle_failure(e) return None
[docs] def put(self, key: str, entry: CacheEntry) -> bool: """Store entry in JSONL shard with encrypted credentials. Args: key: Cache key for entry (should match entry.key) entry: CacheEntry to store Returns: True if stored successfully, False otherwise """ try: # Validate key matches entry.key for consistency if key != entry.key: logger.warning( f"Key mismatch in put(): parameter '{key}' vs entry.key '{entry.key}', using entry.key" ) # Check if we need to evict before adding (LRU eviction) # Only evict if this is a new key and we're at capacity if ( self.config.max_entries and entry.key not in self._index and len(self._index) >= self.config.max_entries ): self._evict_oldest() # Always use entry.key for consistency shard_id = self._get_shard_id(entry.key) # Read existing shard entries = self._read_shard(shard_id) # Prepare entry data with all fields including health monitoring data = { "key": entry.key, "proxy_url": entry.proxy_url, "source": entry.source, "fetch_time": entry.fetch_time.isoformat(), "last_accessed": entry.last_accessed.isoformat(), "access_count": entry.access_count, "ttl_seconds": entry.ttl_seconds, "expires_at": entry.expires_at.isoformat(), "health_status": entry.health_status.value, "failure_count": entry.failure_count, "evicted_from_l1": entry.evicted_from_l1, # Health monitoring fields (Feature 006) "last_health_check": entry.last_health_check.isoformat() if entry.last_health_check else None, "consecutive_health_failures": entry.consecutive_health_failures, "consecutive_health_successes": entry.consecutive_health_successes, "recovery_attempt": entry.recovery_attempt, "next_check_time": entry.next_check_time.isoformat() if entry.next_check_time else None, "last_health_error": entry.last_health_error, "total_health_checks": entry.total_health_checks, "total_health_check_failures": entry.total_health_check_failures, } # Encrypt credentials if entry.username: encrypted = self.encryptor.encrypt(entry.username) data["username_encrypted"] = encrypted.hex() if entry.password: encrypted = self.encryptor.encrypt(entry.password) data["password_encrypted"] = encrypted.hex() # Add/update entry using entry.key for consistency entries[entry.key] = data # Write shard if self._write_shard(shard_id, entries): self._index[entry.key] = shard_id # Update access order for LRU tracking if entry.key in self._access_order: self._access_order.move_to_end(entry.key) self._access_order[entry.key] = entry.last_accessed.timestamp() self.reset_failures() return True return False except Exception as e: self.handle_failure(e) return False
[docs] def delete(self, key: str) -> bool: """Remove entry from JSONL shard. Args: key: Cache key to delete Returns: True if entry existed and was deleted, False if not found """ try: shard_id = self._index.get(key) if shard_id is None: return False entries = self._read_shard(shard_id) if key not in entries: self._index.pop(key, None) self._access_order.pop(key, None) return False del entries[key] if self._write_shard(shard_id, entries): self._index.pop(key, None) self._access_order.pop(key, None) return True return False except Exception as e: self.handle_failure(e) return False
[docs] def clear(self) -> int: """Clear all JSONL shard files. Returns: Number of entries cleared """ try: count = len(self._index) # Delete all shard files for shard_id in range(self.num_shards): shard_path = self._get_shard_path(shard_id) if shard_path.exists(): shard_path.unlink() self._index.clear() self._access_order.clear() self.reset_failures() return count except Exception as e: self.handle_failure(e) return 0
[docs] def size(self) -> int: """Return current number of entries. Returns: Number of entries in index """ return len(self._index)
[docs] def keys(self) -> list[str]: """Return list of all keys. Returns: List of cache keys """ return list(self._index.keys())
[docs] def cleanup_expired(self) -> int: """Remove all expired entries from all shards. Returns: Number of entries removed """ try: removed = 0 now = datetime.now(timezone.utc) for shard_id in range(self.num_shards): entries = self._read_shard(shard_id) original_count = len(entries) # Filter out expired entries valid_entries = {} for key, data in entries.items(): expires_at_str = data.get("expires_at") if expires_at_str: try: expires_at = datetime.fromisoformat(expires_at_str) if expires_at >= now: valid_entries[key] = data else: self._index.pop(key, None) self._access_order.pop(key, None) removed += 1 except ValueError: # Invalid timestamp - keep entry valid_entries[key] = data else: valid_entries[key] = data # Write back if entries were removed if len(valid_entries) < original_count: self._write_shard(shard_id, valid_entries) self.reset_failures() return removed except Exception as e: self.handle_failure(e) return 0
[docs] class DiskCacheTier(CacheTier): """L2 SQLite-based cache with encryption and indexed lookups. Optimized for >10K entries using SQLite with B-tree indexes instead of JSONL. Provides O(log n) lookups vs O(n) for JSONL, achieving <10ms reads for 10K+ entries. Uses a lightweight SQLite database with: - Primary key index on cache key for fast lookups - Encrypted credentials stored as BLOB - Efficient bulk operations (cleanup, size, keys) - File-based persistence without complex sharding - Persistent connection pooling for performance (RES-005) Thread Safety: Uses a threading.Lock to protect connection access. The connection is created with check_same_thread=False to allow multi-threaded access. """ def __init__( self, config: CacheTierConfig, tier_type: TierType, cache_dir: Path, encryptor: CredentialEncryptor | None = None, ) -> None: """Initialize SQLite-based L2 cache. Args: config: Tier configuration tier_type: Type of tier (should be L2_FILE) cache_dir: Directory for cache database encryptor: Credential encryptor for username/password """ super().__init__(config, tier_type) self.cache_dir = cache_dir self.cache_dir.mkdir(parents=True, exist_ok=True) self.encryptor = encryptor or CredentialEncryptor() # Use SQLite database in cache directory self.db_path = self.cache_dir / "l2_cache.db" # Connection pool: persistent connection with thread-safe access (RES-005) import threading self._conn: sqlite3.Connection | None = None self._conn_lock = threading.Lock() self._init_db() def _get_connection(self) -> sqlite3.Connection: """Get or create persistent SQLite connection (RES-005 fix). Thread-safe access to a single persistent connection. Creates the connection on first access with settings optimized for concurrent use. Returns: SQLite connection instance """ with self._conn_lock: if self._conn is None: self._conn = sqlite3.connect( str(self.db_path), check_same_thread=False, # Allow multi-threaded access timeout=30.0, # Wait up to 30s for locks isolation_level="DEFERRED", # Better concurrency ) # Enable write-ahead logging for better concurrency self._conn.execute("PRAGMA journal_mode=WAL") self._conn.execute("PRAGMA synchronous=NORMAL") return self._conn
[docs] def close(self) -> None: """Close the persistent SQLite connection and release database resources. Should be called when the cache tier is no longer needed to properly release database resources and file locks. Safe to call multiple times. Side Effects: - Acquires connection lock to ensure thread safety. - Closes active SQLite connection if present. - Sets internal connection to None to prevent reuse. - Suppresses any exceptions during close to ensure cleanup completes. Thread Safety: Thread-safe via internal lock. Multiple threads can safely call this method concurrently. Example: >>> tier = DiskCacheTier(config, TierType.L2_FILE, cache_dir) >>> # ... use tier ... >>> tier.close() # Clean up resources >>> tier.close() # Safe to call again """ with self._conn_lock: if self._conn is not None: try: self._conn.close() except Exception: pass # Ignore errors on close self._conn = None
def __del__(self) -> None: """Destructor to ensure SQLite connection is closed during garbage collection. Automatically called when the DiskCacheTier object is garbage collected. Ensures database resources are released even if close() was not explicitly called. Side Effects: - Calls close() to release database connection and locks. - Prevents resource leaks if object is destroyed without explicit cleanup. Note: Destructors in Python are not guaranteed to be called immediately. For predictable cleanup, explicitly call close() instead of relying on __del__. """ self.close() def _init_db(self) -> None: """Initialize L2 cache database schema. Creates a lightweight table optimized for L2 tier operations: - Simpler schema than L3 (no health monitoring fields) - Primary key index for fast lookups - Expires_at index for efficient cleanup """ try: conn = self._get_connection() conn.execute(""" CREATE TABLE IF NOT EXISTS l2_cache ( key TEXT PRIMARY KEY, proxy_url TEXT NOT NULL, username_encrypted BLOB, password_encrypted BLOB, source TEXT NOT NULL, fetch_time REAL NOT NULL, last_accessed REAL NOT NULL, access_count INTEGER DEFAULT 0, ttl_seconds INTEGER NOT NULL, expires_at REAL NOT NULL, health_status TEXT DEFAULT 'unknown', failure_count INTEGER DEFAULT 0, evicted_from_l1 INTEGER DEFAULT 0 ) """) # Create indexes for common queries conn.execute("CREATE INDEX IF NOT EXISTS idx_l2_expires_at ON l2_cache(expires_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_l2_source ON l2_cache(source)") conn.commit() except Exception as e: self.handle_failure(e)
[docs] def get(self, key: str) -> CacheEntry | None: """Retrieve entry from SQLite database with O(log n) indexed lookup. Args: key: Cache key to lookup Returns: CacheEntry if found and valid, None otherwise """ try: conn = self._get_connection() cursor = conn.execute("SELECT * FROM l2_cache WHERE key = ?", (key,)) row = cursor.fetchone() if not row: return None # Map row to dict columns = [ "key", "proxy_url", "username_encrypted", "password_encrypted", "source", "fetch_time", "last_accessed", "access_count", "ttl_seconds", "expires_at", "health_status", "failure_count", "evicted_from_l1", ] data = dict(zip(columns, row)) # Decrypt credentials if data["username_encrypted"]: data["username"] = self.encryptor.decrypt(data["username_encrypted"]) if data["password_encrypted"]: data["password"] = self.encryptor.decrypt(data["password_encrypted"]) # Convert timestamps for field in ["fetch_time", "last_accessed", "expires_at"]: if data.get(field) is not None: data[field] = datetime.fromtimestamp(data[field], tz=timezone.utc) # Convert boolean data["evicted_from_l1"] = bool(data["evicted_from_l1"]) # Remove encrypted fields data.pop("username_encrypted") data.pop("password_encrypted") self.reset_failures() return CacheEntry(**data) except Exception as e: self.handle_failure(e) return None
[docs] def put(self, key: str, entry: CacheEntry) -> bool: """Store entry in SQLite database with INSERT OR REPLACE. Args: key: Cache key for entry entry: CacheEntry to store Returns: True if stored successfully, False otherwise """ try: # Encrypt credentials (encryptor.encrypt expects SecretStr, not string) username_encrypted = None if entry.username: username_encrypted = self.encryptor.encrypt(entry.username) password_encrypted = None if entry.password: password_encrypted = self.encryptor.encrypt(entry.password) conn = self._get_connection() conn.execute( """ INSERT OR REPLACE INTO l2_cache ( key, proxy_url, username_encrypted, password_encrypted, source, fetch_time, last_accessed, access_count, ttl_seconds, expires_at, health_status, failure_count, evicted_from_l1 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.key, entry.proxy_url, username_encrypted, password_encrypted, entry.source, entry.fetch_time.timestamp(), entry.last_accessed.timestamp(), entry.access_count, entry.ttl_seconds, entry.expires_at.timestamp(), entry.health_status.value, entry.failure_count, int(entry.evicted_from_l1), ), ) conn.commit() self.reset_failures() return True except Exception as e: self.handle_failure(e) return False
[docs] def delete(self, key: str) -> bool: """Remove entry from SQLite database by cache key. Uses SQL DELETE with primary key lookup for O(log n) performance. Args: key: Cache key to delete. Returns: True if entry existed and was deleted, False if not found or on error. Side Effects: - Acquires persistent connection via thread-safe _get_connection(). - Executes DELETE query with parameterized key (SQL injection safe). - Commits transaction before returning. - Does NOT reset failure counter (only success paths do). Thread Safety: Thread-safe via connection lock in _get_connection(). Raises: Exception: Caught and handled via handle_failure(), returns False. """ try: conn = self._get_connection() cursor = conn.execute("DELETE FROM l2_cache WHERE key = ?", (key,)) deleted = cursor.rowcount > 0 conn.commit() return deleted except Exception as e: self.handle_failure(e) return False
[docs] def clear(self) -> int: """Clear all entries from SQLite L2 cache database. Performs bulk deletion of all cache entries. More efficient than iterating through individual deletes. Returns: Number of entries that were cleared, 0 on error. Side Effects: - Acquires persistent connection via thread-safe _get_connection(). - Counts total entries before deletion. - Executes DELETE FROM without WHERE clause (removes all rows). - Commits transaction before returning. - Resets failure counter on success. Thread Safety: Thread-safe via connection lock in _get_connection(). Performance: O(n) where n is the number of entries, but executes as a single SQL operation with automatic index cleanup. Raises: Exception: Caught and handled via handle_failure(), returns 0. Example: >>> tier = DiskCacheTier(config, TierType.L2_FILE, cache_dir) >>> tier.size() 1000 >>> cleared = tier.clear() >>> print(f"Cleared {cleared} entries") Cleared 1000 entries >>> tier.size() 0 """ try: conn = self._get_connection() cursor = conn.execute("SELECT COUNT(*) FROM l2_cache") count = cursor.fetchone()[0] conn.execute("DELETE FROM l2_cache") conn.commit() self.reset_failures() return count except Exception as e: self.handle_failure(e) return 0
[docs] def size(self) -> int: """Return current number of entries in SQLite L2 cache database. Uses SQL COUNT(*) for efficient O(1) size calculation via table metadata. Returns: Number of cache entries currently stored, 0 on error. Side Effects: - Acquires persistent connection via thread-safe _get_connection(). - Executes SELECT COUNT(*) query (reads table metadata, not rows). - Resets failure counter on success. Thread Safety: Thread-safe via connection lock in _get_connection(). Performance: O(1) - SQLite maintains row count in table metadata. Raises: Exception: Caught and handled via handle_failure(), returns 0. """ try: conn = self._get_connection() cursor = conn.execute("SELECT COUNT(*) FROM l2_cache") count = cursor.fetchone()[0] self.reset_failures() return count except Exception as e: self.handle_failure(e) return 0
[docs] def keys(self) -> list[str]: """Return list of all cache keys from SQLite L2 database. Retrieves all cache keys without loading full entry data. Useful for cache inspection, debugging, and bulk operations. Returns: List of all cache keys in database, empty list on error. Side Effects: - Acquires persistent connection via thread-safe _get_connection(). - Executes SELECT key query (fetches only key column, not full rows). - Loads all keys into memory as a list. - Resets failure counter on success. Thread Safety: Thread-safe via connection lock in _get_connection(). Performance: O(n) where n is number of entries. For large caches (>10K entries), consider using size() to check count before calling. Raises: Exception: Caught and handled via handle_failure(), returns []. Warning: For very large caches (>100K entries), this may consume significant memory. Consider pagination or streaming approaches if needed. """ try: conn = self._get_connection() cursor = conn.execute("SELECT key FROM l2_cache") keys = [row[0] for row in cursor.fetchall()] self.reset_failures() return keys except Exception as e: self.handle_failure(e) return []
[docs] def cleanup_expired(self) -> int: """Remove all expired entries from SQLite L2 database using indexed SQL DELETE. Performs bulk deletion of expired entries in a single SQL operation. Uses the expires_at index for efficient identification. Returns: Number of expired entries that were removed, 0 on error. Side Effects: - Acquires persistent connection via thread-safe _get_connection(). - Calculates current timestamp in UTC. - Executes DELETE with WHERE clause using expires_at index. - Commits transaction before returning. - Resets failure counter on success. Thread Safety: Thread-safe via connection lock in _get_connection(). Performance: O(m log n) where m is number of expired entries and n is total entries. The idx_l2_expires_at index makes this significantly faster than full table scan. Raises: Exception: Caught and handled via handle_failure(), returns 0. Example: >>> tier = DiskCacheTier(config, TierType.L2_FILE, cache_dir) >>> removed = tier.cleanup_expired() >>> print(f"Removed {removed} expired entries") Removed 42 expired entries """ try: now = datetime.now(timezone.utc).timestamp() conn = self._get_connection() cursor = conn.execute("DELETE FROM l2_cache WHERE expires_at < ?", (now,)) removed = cursor.rowcount conn.commit() self.reset_failures() return removed except Exception as e: self.handle_failure(e) return 0
[docs] def migrate_from_jsonl(self, jsonl_dir: Path | None = None) -> int: """Migrate existing JSONL shard files to SQLite L2 cache. This method provides a migration path from the old JSONL-based L2 cache to the new SQLite-based implementation. It reads all shard_*.jsonl files from the specified directory and imports them into the SQLite database. Args: jsonl_dir: Directory containing shard_*.jsonl files. Defaults to self.cache_dir if not specified. Returns: Number of entries successfully migrated Example: >>> tier = DiskCacheTier(config, TierType.L2_FILE, cache_dir) >>> migrated = tier.migrate_from_jsonl() >>> print(f"Migrated {migrated} entries from JSONL to SQLite") """ if jsonl_dir is None: jsonl_dir = self.cache_dir migrated = 0 errors = 0 # Find all JSONL shard files shard_files = list(jsonl_dir.glob("shard_*.jsonl")) if not shard_files: return 0 for shard_file in shard_files: try: with portalocker.Lock(shard_file, "r", timeout=5) as f: for line in f: if not line.strip(): continue try: data = json.loads(line) # Create CacheEntry from JSONL data # Handle both old and new timestamp formats for field in ["fetch_time", "last_accessed", "expires_at"]: if field in data and isinstance(data[field], str): data[field] = datetime.fromisoformat(data[field]) # Handle encrypted credentials if present if "username_encrypted" in data: username_hex = data.pop("username_encrypted") data["username"] = self.encryptor.decrypt( bytes.fromhex(username_hex) ) if "password_encrypted" in data: password_hex = data.pop("password_encrypted") data["password"] = self.encryptor.decrypt( bytes.fromhex(password_hex) ) # Create entry and store in SQLite entry = CacheEntry(**data) if self.put(entry.key, entry): migrated += 1 else: errors += 1 except (json.JSONDecodeError, ValueError, KeyError): # Skip corrupted entries errors += 1 continue except Exception: # Skip files that can't be read errors += 1 continue return migrated
[docs] class SQLiteCacheTier(CacheTier): """L3 SQLite database cache with encrypted credentials. Provides durable persistence with SQL indexing for fast lookups. """ def __init__( self, config: CacheTierConfig, tier_type: TierType, db_path: Path, encryptor: CredentialEncryptor | None = None, ) -> None: """Initialize SQLite-based L3 cache with health monitoring. Args: config: Tier configuration settings. tier_type: Type of tier (should be L3_SQLITE). db_path: Path to SQLite database file. encryptor: Optional credential encryptor for username/password. Creates default CredentialEncryptor if not provided. Side Effects: - Creates parent directories for database if they don't exist. - Initializes database schema with cache_entries and health_history tables. - Creates indexes for expires_at, source, health_status, and last_accessed. """ super().__init__(config, tier_type) self.db_path = db_path self.db_path.parent.mkdir(parents=True, exist_ok=True) self.encryptor = encryptor or CredentialEncryptor() self._init_db() def _init_db(self) -> None: """Initialize database schema with health monitoring fields. Creates two tables: 1. cache_entries: Main cache storage with encrypted credentials and health fields. 2. health_history: Historical record of health check results. Creates indexes for efficient queries on expires_at, source, health_status, last_accessed, and health_history lookups. Side Effects: - Creates cache_entries table if not exists. - Creates health_history table if not exists. - Migrates existing tables to add health monitoring columns. - Creates performance indexes. - Commits schema changes. """ with sqlite3.connect(str(self.db_path)) as conn: # Create cache_entries table conn.execute(""" CREATE TABLE IF NOT EXISTS cache_entries ( key TEXT PRIMARY KEY, proxy_url TEXT NOT NULL, username_encrypted BLOB, password_encrypted BLOB, source TEXT NOT NULL, fetch_time REAL NOT NULL, last_accessed REAL NOT NULL, access_count INTEGER DEFAULT 0, ttl_seconds INTEGER NOT NULL, expires_at REAL NOT NULL, health_status TEXT DEFAULT 'unknown', failure_count INTEGER DEFAULT 0, created_at REAL NOT NULL, updated_at REAL NOT NULL, -- Health monitoring fields (Feature 006) last_health_check REAL, consecutive_health_failures INTEGER DEFAULT 0, consecutive_health_successes INTEGER DEFAULT 0, recovery_attempt INTEGER DEFAULT 0, next_check_time REAL, last_health_error TEXT, total_health_checks INTEGER DEFAULT 0, total_health_check_failures INTEGER DEFAULT 0, evicted_from_l1 INTEGER DEFAULT 0 ) """) # Migrate existing tables to add health columns (T008) self._migrate_health_columns(conn) # Create health_history table (T007) conn.execute(""" CREATE TABLE IF NOT EXISTS health_history ( id INTEGER PRIMARY KEY AUTOINCREMENT, proxy_key TEXT NOT NULL, check_time REAL NOT NULL, status TEXT NOT NULL, response_time_ms REAL, error_message TEXT, check_url TEXT NOT NULL, FOREIGN KEY (proxy_key) REFERENCES cache_entries(key) ON DELETE CASCADE ) """) # Create indexes conn.execute("CREATE INDEX IF NOT EXISTS idx_expires_at ON cache_entries(expires_at)") conn.execute("CREATE INDEX IF NOT EXISTS idx_source ON cache_entries(source)") conn.execute( "CREATE INDEX IF NOT EXISTS idx_health_status ON cache_entries(health_status)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_last_accessed ON cache_entries(last_accessed)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_health_history_proxy ON health_history(proxy_key)" ) conn.execute( "CREATE INDEX IF NOT EXISTS idx_health_history_time ON health_history(check_time)" ) conn.commit() def _migrate_health_columns(self, conn: sqlite3.Connection) -> None: """Add health monitoring columns to existing cache_entries table if they don't exist (T008). Provides backward compatibility by adding new health monitoring columns to existing databases without data loss. Args: conn: Active SQLite connection to the cache database. Side Effects: - Adds missing health monitoring columns to cache_entries table. - Uses whitelisted column names to prevent SQL injection. - Silently ignores OperationalError if columns already exist (race condition safety). Note: Column names are from a hardcoded whitelist, not user input, making f-string SQL safe in this specific context. """ # Get existing columns cursor = conn.execute("PRAGMA table_info(cache_entries)") existing_columns = {row[1] for row in cursor.fetchall()} # Define health columns to add (whitelist to prevent injection) health_columns = [ ("last_health_check", "REAL"), ("consecutive_health_failures", "INTEGER DEFAULT 0"), ("consecutive_health_successes", "INTEGER DEFAULT 0"), ("recovery_attempt", "INTEGER DEFAULT 0"), ("next_check_time", "REAL"), ("last_health_error", "TEXT"), ("total_health_checks", "INTEGER DEFAULT 0"), ("total_health_check_failures", "INTEGER DEFAULT 0"), ("evicted_from_l1", "INTEGER DEFAULT 0"), ] # Add missing columns - safe because column names are from whitelist for col_name, col_type in health_columns: if col_name not in existing_columns: try: # Column names from whitelist, not user input conn.execute(f"ALTER TABLE cache_entries ADD COLUMN {col_name} {col_type}") except sqlite3.OperationalError: # Column may already exist in a concurrent process pass
[docs] def get(self, key: str) -> CacheEntry | None: """Retrieve entry from SQLite database with decrypted credentials. Args: key: Cache key to lookup. Returns: CacheEntry if found with decrypted username/password, None otherwise. Side Effects: - Opens new SQLite connection for query. - Decrypts username_encrypted and password_encrypted BLOBs. - Converts UNIX timestamps to datetime objects. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns None. """ try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute("SELECT * FROM cache_entries WHERE key = ?", (key,)) row = cursor.fetchone() if not row: return None # Map row to dict columns = [ "key", "proxy_url", "username_encrypted", "password_encrypted", "source", "fetch_time", "last_accessed", "access_count", "ttl_seconds", "expires_at", "health_status", "failure_count", "created_at", "updated_at", # Health monitoring fields "last_health_check", "consecutive_health_failures", "consecutive_health_successes", "recovery_attempt", "next_check_time", "last_health_error", "total_health_checks", "total_health_check_failures", "evicted_from_l1", ] data = dict(zip(columns, row)) # Decrypt credentials if data["username_encrypted"]: data["username"] = self.encryptor.decrypt(data["username_encrypted"]) if data["password_encrypted"]: data["password"] = self.encryptor.decrypt(data["password_encrypted"]) # Convert timestamps for field in [ "fetch_time", "last_accessed", "expires_at", "created_at", "updated_at", "last_health_check", "next_check_time", ]: if data.get(field) is not None: data[field] = datetime.fromtimestamp(data[field], tz=timezone.utc) # Convert boolean if data.get("evicted_from_l1") is not None: data["evicted_from_l1"] = bool(data["evicted_from_l1"]) # Remove encrypted fields data.pop("username_encrypted") data.pop("password_encrypted") self.reset_failures() return CacheEntry(**data) except Exception as e: self.handle_failure(e) return None
[docs] def put(self, key: str, entry: CacheEntry) -> bool: """Store entry in SQLite database with encrypted credentials. Args: key: Cache key for the entry. entry: CacheEntry object to store with all fields including health monitoring data. Returns: True if stored successfully, False on error. Side Effects: - Opens new SQLite connection for write. - Encrypts username and password fields as BLOBs. - Uses INSERT OR REPLACE (upsert) to handle updates. - Sets created_at and updated_at to current timestamp. - Commits transaction before returning. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns False. """ try: # Encrypt credentials username_encrypted = None if entry.username: username_encrypted = self.encryptor.encrypt(entry.username) password_encrypted = None if entry.password: password_encrypted = self.encryptor.encrypt(entry.password) # Convert timestamps to UNIX epoch now = datetime.now(timezone.utc).timestamp() # Helper to convert optional datetime to timestamp def to_timestamp(dt: datetime | None) -> float | None: return dt.timestamp() if dt is not None else None with sqlite3.connect(str(self.db_path)) as conn: conn.execute( """ INSERT OR REPLACE INTO cache_entries ( key, proxy_url, username_encrypted, password_encrypted, source, fetch_time, last_accessed, access_count, ttl_seconds, expires_at, health_status, failure_count, created_at, updated_at, last_health_check, consecutive_health_failures, consecutive_health_successes, recovery_attempt, next_check_time, last_health_error, total_health_checks, total_health_check_failures, evicted_from_l1 ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( entry.key, entry.proxy_url, username_encrypted, password_encrypted, entry.source, entry.fetch_time.timestamp(), entry.last_accessed.timestamp(), entry.access_count, entry.ttl_seconds, entry.expires_at.timestamp(), entry.health_status.value, entry.failure_count, now, now, # Health monitoring fields to_timestamp(entry.last_health_check), entry.consecutive_health_failures, entry.consecutive_health_successes, entry.recovery_attempt, to_timestamp(entry.next_check_time), entry.last_health_error, entry.total_health_checks, entry.total_health_check_failures, int(entry.evicted_from_l1), ), ) conn.commit() self.reset_failures() return True except Exception as e: self.handle_failure(e) return False
[docs] def delete(self, key: str) -> bool: """Remove entry from SQLite database by key. Args: key: Cache key to delete. Returns: True if entry existed and was deleted, False if not found. Side Effects: - Opens new SQLite connection for deletion. - Cascades to delete related health_history records via FOREIGN KEY. - Commits transaction before returning. Raises: Exception: Caught and handled via handle_failure(), returns False. """ try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute("DELETE FROM cache_entries WHERE key = ?", (key,)) deleted = cursor.rowcount > 0 conn.commit() return deleted except Exception as e: self.handle_failure(e) return False
[docs] def clear(self) -> int: """Clear all entries from SQLite database. Returns: Number of entries that were deleted. Side Effects: - Opens new SQLite connection for operation. - Deletes all rows from cache_entries table. - Cascades to delete all health_history records via FOREIGN KEY. - Commits transaction before returning. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns 0. """ try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute("SELECT COUNT(*) FROM cache_entries") count: int = int(cursor.fetchone()[0]) conn.execute("DELETE FROM cache_entries") conn.commit() return count except Exception as e: self.handle_failure(e) return 0
[docs] def size(self) -> int: """Return current number of entries in SQLite database. Returns: Count of entries in cache_entries table, 0 on error. Side Effects: - Opens new SQLite connection for query. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns 0. """ try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute("SELECT COUNT(*) FROM cache_entries") result = cursor.fetchone() return int(result[0]) if result else 0 except Exception as e: self.handle_failure(e) return 0
[docs] def keys(self) -> list[str]: """Return all cache keys from SQLite database. Returns: List of all cache keys, empty list on error. Side Effects: - Opens new SQLite connection for query. - Resets failure counter on success. Raises: Exception: Caught and handled via handle_failure(), returns []. """ try: with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute("SELECT key FROM cache_entries") result = [str(row[0]) for row in cursor.fetchall()] return result except Exception as e: self.handle_failure(e) return []
[docs] def cleanup_expired(self) -> int: """Remove all expired entries in bulk using SQL DELETE. This is significantly more efficient than iterating through all entries, reducing cleanup from O(n) to O(1) for expired entries. Returns: Number of entries removed """ try: now = datetime.now(timezone.utc).timestamp() with sqlite3.connect(str(self.db_path)) as conn: cursor = conn.execute( "DELETE FROM cache_entries WHERE expires_at < ?", (now,), ) removed = cursor.rowcount conn.commit() self.reset_failures() return removed except Exception as e: self.handle_failure(e) return 0