"""
Pydantic models for cache data structures.
Defines data models for cache entries, configurations, and statistics
used across the three-tier caching system (L1/L2/L3).
"""
from __future__ import annotations
from datetime import datetime, timezone
from enum import Enum
from pydantic import BaseModel, Field, SecretStr, computed_field, field_validator
# Import canonical HealthStatus from main models
from proxywhirl.models import HealthStatus
__all__ = [
"HealthStatus",
"CacheEntry",
"CacheTierConfig",
"CacheConfig",
"TierStatistics",
"CacheStatistics",
"CacheTierType",
"L2BackendType",
]
[docs]
class CacheTierType(str, Enum):
"""Type of cache tier."""
L1 = "l1"
L2 = "l2"
L3 = "l3"
[docs]
class L2BackendType(str, Enum):
"""L2 cache backend type.
Determines which storage backend is used for the L2 disk cache tier.
- JSONL: File-based using JSON Lines format. Human-readable, portable,
best for <10K entries. Uses sharded files with file locking.
- SQLITE: Database-based using SQLite. Faster for >10K entries with
O(log n) indexed lookups. Better concurrency and atomic operations.
"""
JSONL = "jsonl"
SQLITE = "sqlite"
[docs]
class CacheEntry(BaseModel):
"""Container for a single cached proxy with metadata.
Stores proxy information with TTL, health status, and access tracking.
Credentials are SecretStr in memory, encrypted at rest in L2/L3.
Example:
>>> entry = CacheEntry(
... key="abc123",
... proxy_url="http://proxy.com:8080",
... source="fetched",
... fetch_time=datetime.now(timezone.utc),
... ttl_seconds=3600,
... )
>>> entry.is_expired
False
"""
# Identity
key: str = Field(..., description="Unique cache key (proxy URL hash)")
proxy_url: str = Field(..., description="Full proxy URL (scheme://host:port)")
# Credentials (encrypted at rest in L2/L3, SecretStr in L1)
username: SecretStr | None = Field(None, description="Proxy username")
password: SecretStr | None = Field(None, description="Proxy password")
# Metadata
source: str = Field(..., description="Proxy source identifier")
fetch_time: datetime = Field(..., description="When proxy was fetched")
last_accessed: datetime = Field(..., description="Last cache access time")
access_count: int = Field(default=0, description="Number of cache hits")
# TTL & Health
ttl_seconds: int = Field(..., ge=0, description="Time-to-live in seconds")
expires_at: datetime = Field(..., description="Absolute expiration time")
health_status: HealthStatus = Field(default=HealthStatus.UNKNOWN)
failure_count: int = Field(default=0, ge=0, description="Consecutive failures")
evicted_from_l1: bool = Field(
default=False, description="Whether entry was evicted from L1 cache"
)
# Health monitoring fields (Feature 006)
last_health_check: datetime | None = Field(None, description="Last health check timestamp")
consecutive_health_failures: int = Field(
default=0, ge=0, description="Consecutive health check failures"
)
consecutive_health_successes: int = Field(
default=0, ge=0, description="Consecutive successful health checks"
)
recovery_attempt: int = Field(default=0, ge=0, description="Current recovery attempt count")
next_check_time: datetime | None = Field(None, description="Scheduled next health check")
last_health_error: str | None = Field(None, description="Last health check error message")
total_health_checks: int = Field(default=0, ge=0, description="Total health checks performed")
total_health_check_failures: int = Field(
default=0, ge=0, description="Total health check failures"
)
@property
[docs]
def is_expired(self) -> bool:
"""Check if entry has expired based on TTL."""
return datetime.now(timezone.utc) >= self.expires_at
@property
[docs]
def is_healthy(self) -> bool:
"""Check if proxy is healthy enough to use."""
return self.health_status == HealthStatus.HEALTHY
[docs]
class Config:
"""Pydantic config."""
json_encoders = {
SecretStr: lambda v: "***", # Never expose credentials in JSON
datetime: lambda v: v.isoformat(),
}
[docs]
class CacheTierConfig(BaseModel):
"""Configuration for a single cache tier.
Defines capacity, eviction policy, and enable/disable state for
one tier (L1, L2, or L3).
Example:
>>> config = CacheTierConfig(max_entries=1000, eviction_policy="lru")
"""
enabled: bool = Field(default=True, description="Enable this tier")
max_entries: int | None = Field(None, description="Max entries (None=unlimited)")
eviction_policy: str = Field(default="lru", description="Eviction policy")
@field_validator("eviction_policy")
@classmethod
[docs]
def validate_policy(cls, v: str) -> str:
"""Validate eviction policy is supported."""
allowed = ["lru", "lfu", "fifo"]
if v not in allowed:
raise ValueError(f"Invalid eviction policy: {v}. Must be one of {allowed}")
return v
[docs]
class CacheConfig(BaseModel):
"""Configuration for cache behavior and tier settings.
Aggregates configuration for all three tiers plus global settings
like TTL, cleanup intervals, and storage paths.
Example:
>>> config = CacheConfig(
... default_ttl_seconds=3600,
... l1_config=CacheTierConfig(max_entries=1000),
... )
"""
# Tier Configuration
l1_config: CacheTierConfig = Field(
default_factory=lambda: CacheTierConfig(max_entries=1000),
description="L1 (Memory) configuration",
)
l2_config: CacheTierConfig = Field(
default_factory=lambda: CacheTierConfig(max_entries=5000),
description="L2 (Disk) configuration - uses l2_backend to select JSONL or SQLite",
)
l2_backend: L2BackendType = Field(
default=L2BackendType.JSONL,
description="L2 storage backend: 'jsonl' (file-based, portable) or 'sqlite' (faster for >10K entries)",
)
l3_config: CacheTierConfig = Field(
default_factory=lambda: CacheTierConfig(max_entries=None),
description="L3 (SQLite) configuration",
)
# TTL Configuration
default_ttl_seconds: int = Field(
default=3600,
ge=60,
description="Default TTL for cached proxies (seconds)",
)
ttl_cleanup_interval: int = Field(
default=60,
ge=10,
description="Background cleanup interval (seconds)",
)
enable_background_cleanup: bool = Field(
default=False,
description="Enable background TTL cleanup thread",
)
cleanup_interval_seconds: int = Field(
default=60,
ge=5,
description="Interval between background cleanup runs (seconds)",
)
per_source_ttl: dict[str, int] = Field(
default_factory=dict,
description="Per-source TTL overrides (source_name -> ttl_seconds)",
)
# Storage Paths
l2_cache_dir: str = Field(
default=".cache/proxies",
description="Directory for L2 cache (JSONL shards or SQLite database)",
)
l3_database_path: str = Field(
default=".cache/db/proxywhirl.db",
description="SQLite database path for L3",
)
# Encryption
encryption_key: SecretStr | None = Field(
None,
description="Fernet encryption key (from env: PROXYWHIRL_CACHE_ENCRYPTION_KEY)",
)
# Health Integration
health_check_invalidation: bool = Field(
default=True,
description="Auto-invalidate on health check failure",
)
failure_threshold: int = Field(
default=3,
ge=1,
description="Failures before health invalidation",
)
# Performance Tuning
enable_statistics: bool = Field(default=True, description="Track cache statistics")
statistics_interval: int = Field(
default=5, ge=1, description="Stats aggregation interval (seconds)"
)
[docs]
class Config:
"""Pydantic config."""
json_encoders = {
SecretStr: lambda v: "***",
}
[docs]
class TierStatistics(BaseModel):
"""Statistics for a single cache tier.
Tracks hits, misses, evictions by reason, and computes hit rate.
Example:
>>> stats = TierStatistics(hits=100, misses=20)
>>> stats.hit_rate
0.8333...
"""
hits: int = Field(default=0, ge=0)
misses: int = Field(default=0, ge=0)
current_size: int = Field(default=0, ge=0)
evictions_lru: int = Field(default=0, ge=0)
evictions_ttl: int = Field(default=0, ge=0)
evictions_health: int = Field(default=0, ge=0)
evictions_corruption: int = Field(default=0, ge=0)
@computed_field # type: ignore[misc]
@property
[docs]
def hit_rate(self) -> float:
"""Cache hit rate (0.0 to 1.0)."""
total = self.hits + self.misses
return self.hits / total if total > 0 else 0.0
@computed_field # type: ignore[misc]
@property
[docs]
def total_evictions(self) -> int:
"""Total evictions across all reasons."""
return (
self.evictions_lru
+ self.evictions_ttl
+ self.evictions_health
+ self.evictions_corruption
)
[docs]
class CacheStatistics(BaseModel):
"""Aggregate cache statistics across all tiers.
Combines tier-level statistics and tracks cross-tier operations
like promotions and demotions.
Example:
>>> stats = CacheStatistics()
>>> stats.l1_stats.hits = 100
>>> stats.overall_hit_rate
1.0
"""
l1_stats: TierStatistics = Field(default_factory=TierStatistics)
l2_stats: TierStatistics = Field(default_factory=TierStatistics)
l3_stats: TierStatistics = Field(default_factory=TierStatistics)
# Cross-tier operations
promotions: int = Field(default=0, ge=0, description="L3→L2→L1 promotions")
demotions: int = Field(default=0, ge=0, description="L1→L2→L3 demotions")
# Degradation tracking
l1_degraded: bool = Field(default=False, description="L1 tier unavailable")
l2_degraded: bool = Field(default=False, description="L2 tier unavailable")
l3_degraded: bool = Field(default=False, description="L3 tier unavailable")
@computed_field # type: ignore[misc]
@property
[docs]
def overall_hit_rate(self) -> float:
"""Overall hit rate across all tiers."""
total_hits = self.l1_stats.hits + self.l2_stats.hits + self.l3_stats.hits
# Each miss is recorded per tier; use max to avoid triple-counting
total_misses = max(self.l1_stats.misses, self.l2_stats.misses, self.l3_stats.misses)
total_requests = total_hits + total_misses
return total_hits / total_requests if total_requests > 0 else 0.0
@computed_field # type: ignore[misc]
@property
[docs]
def total_size(self) -> int:
"""Total cached entries across all tiers."""
return self.l1_stats.current_size + self.l2_stats.current_size + self.l3_stats.current_size
[docs]
def to_metrics_dict(self) -> dict[str, float]:
"""Convert to flat metrics dict for monitoring systems."""
return {
"cache.l1.hit_rate": self.l1_stats.hit_rate,
"cache.l2.hit_rate": self.l2_stats.hit_rate,
"cache.l3.hit_rate": self.l3_stats.hit_rate,
"cache.overall.hit_rate": self.overall_hit_rate,
"cache.total_size": float(self.total_size),
"cache.promotions": float(self.promotions),
"cache.demotions": float(self.demotions),
"cache.l1.size": float(self.l1_stats.current_size),
"cache.l2.size": float(self.l2_stats.current_size),
"cache.l3.size": float(self.l3_stats.current_size),
}