Source code for proxywhirl.rwlock

"""
Readers-writer locks for high-concurrency scenarios.

This module provides both async and sync RWLock implementations for
allowing multiple concurrent readers but exclusive writer access.
"""

from __future__ import annotations

import asyncio
import threading
from collections.abc import Iterator
from contextlib import asynccontextmanager, contextmanager

from aiorwlock import RWLock as _AioRWLock
from readerwriterlock import rwlock as _rwlock


[docs] class AsyncRWLock: """Async readers-writer lock for high-concurrency scenarios. Allows multiple concurrent readers but exclusive writer access. Prevents read operations from blocking each other while maintaining data consistency for writes. This is a thin wrapper around aiorwlock.RWLock providing a consistent interface with method-based context manager access. Example: >>> rwlock = AsyncRWLock() >>> async with rwlock.read_lock(): ... # Multiple readers can acquire simultaneously ... value = shared_data.read() >>> async with rwlock.write_lock(): ... # Only one writer at a time ... shared_data.write(new_value) """ def __init__(self) -> None: """Initialize the RWLock.""" self._lock = _AioRWLock() self._state_lock = asyncio.Lock() self._readers = 0 self._writers = 0 self._write_waiters = 0
[docs] async def acquire_read(self) -> None: """Acquire a read lock.""" await self._lock.reader_lock.acquire() async with self._state_lock: self._readers += 1
[docs] async def release_read(self) -> None: """Release a read lock.""" try: result = self._lock.reader_lock.release() if asyncio.iscoroutine(result): await result except RuntimeError: return async with self._state_lock: if self._readers > 0: self._readers -= 1
[docs] async def acquire_write(self) -> None: """Acquire a write lock.""" async with self._state_lock: self._write_waiters += 1 await self._lock.writer_lock.acquire() async with self._state_lock: self._write_waiters = max(0, self._write_waiters - 1) self._writers += 1
[docs] async def release_write(self) -> None: """Release a write lock.""" try: result = self._lock.writer_lock.release() if asyncio.iscoroutine(result): await result except RuntimeError: return async with self._state_lock: if self._writers > 0: self._writers -= 1
@asynccontextmanager
[docs] async def read_lock(self): """Get read lock async context manager.""" await self.acquire_read() try: yield finally: await self.release_read()
@asynccontextmanager
[docs] async def write_lock(self): """Get write lock async context manager.""" await self.acquire_write() try: yield finally: await self.release_write()
[docs] class SyncRWLock: """Synchronous readers-writer lock for high-concurrency scenarios. Allows multiple concurrent readers but exclusive writer access. Prevents read operations from blocking each other while maintaining data consistency for writes. This is a thin wrapper around readerwriterlock.RWLockFair providing a consistent interface with the async version. Features: - Multiple readers can hold the lock simultaneously - Writers get exclusive access (no readers or other writers) - Fair scheduling to prevent writer starvation Example: >>> rwlock = SyncRWLock() >>> with rwlock.read_lock(): ... # Multiple readers can acquire simultaneously ... value = shared_data.read() >>> with rwlock.write_lock(): ... # Only one writer at a time ... shared_data.write(new_value) """ def __init__(self) -> None: """Initialize the SyncRWLock.""" self._lock = _rwlock.RWLockFair() self._state_lock = threading.Lock() self._readers = 0 self._writers = 0 self._write_waiters = 0 @contextmanager
[docs] def read_lock(self) -> Iterator[None]: """Context manager for read locks. Allows multiple concurrent readers. Example: >>> with rwlock.read_lock(): ... # Multiple threads can read simultaneously ... data = shared_resource.read() """ with self._lock.gen_rlock(): with self._state_lock: self._readers += 1 try: yield finally: with self._state_lock: if self._readers > 0: self._readers -= 1
@contextmanager
[docs] def write_lock(self) -> Iterator[None]: """Context manager for write locks. Provides exclusive access - no concurrent readers or writers. Example: >>> with rwlock.write_lock(): ... # Exclusive access guaranteed ... shared_resource.write(new_data) """ with self._state_lock: self._write_waiters += 1 with self._lock.gen_wlock(): with self._state_lock: self._write_waiters = max(0, self._write_waiters - 1) self._writers += 1 try: yield finally: with self._state_lock: if self._writers > 0: self._writers -= 1