"""Database migration utilities for ProxyWhirl.
This module provides a programmatic API for running Alembic migrations,
following ProxyWhirl's library-first architecture. It allows applications
to manage database schema versions without requiring command-line tools.
Key Features:
- Programmatic migration execution
- Current revision checking
- Pending migrations detection
- Database initialization
- Async SQLite support
Best Practices:
- Always backup databases before migrations
- Test migrations in development first
- Use check_pending_migrations() in CI/CD
- Call run_migrations() on application startup
Example::
from proxywhirl.migrations import run_migrations, get_current_revision
# Run all pending migrations
await run_migrations("sqlite+aiosqlite:///./mydb.db")
# Check current version
revision = await get_current_revision("sqlite+aiosqlite:///./mydb.db")
print(f"Database at revision: {revision}")
"""
from __future__ import annotations
import asyncio
from pathlib import Path
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy.ext.asyncio import AsyncEngine, create_async_engine
from alembic import command
def _get_alembic_config(database_url: str | None = None) -> Config:
"""Get Alembic configuration object.
Args:
database_url: SQLAlchemy database URL. If None, uses default from alembic.ini
Returns:
Configured Alembic Config object
"""
# Find alembic.ini in project root
ini_path = Path(__file__).parent.parent / "alembic.ini"
if not ini_path.exists():
raise FileNotFoundError(
f"alembic.ini not found at {ini_path}. Run 'alembic init alembic' to initialize."
)
config = Config(str(ini_path))
# Override database URL if provided
if database_url:
config.set_main_option("sqlalchemy.url", database_url)
return config
[docs]
async def run_migrations(database_url: str | None = None, target_revision: str = "head") -> None:
"""Run all pending migrations up to target revision.
This function executes Alembic migrations programmatically, allowing
applications to manage schema versions without CLI tools. It uses
async SQLite support for non-blocking execution.
Args:
database_url: SQLAlchemy async database URL (e.g., "sqlite+aiosqlite:///./db.sqlite").
If None, uses default from alembic.ini
target_revision: Target revision to migrate to. Defaults to "head" (latest).
Can be specific revision ID, relative revision (e.g., "+1", "-2"),
or branch label.
Raises:
FileNotFoundError: If alembic.ini or migration scripts not found
ValueError: If target revision is invalid
Exception: If migration fails
Example::
# Migrate to latest
await run_migrations("sqlite+aiosqlite:///./proxywhirl.db")
# Migrate to specific revision
await run_migrations("sqlite+aiosqlite:///./db.sqlite", "58c0fadfa0ca")
# Rollback one revision
await run_migrations("sqlite+aiosqlite:///./db.sqlite", "-1")
"""
config = _get_alembic_config(database_url)
# Run migration in async context
# Note: Alembic command API is synchronous, but the migrations themselves
# use async engine via env.py configuration
await asyncio.get_event_loop().run_in_executor(None, command.upgrade, config, target_revision)
[docs]
async def downgrade_migrations(
database_url: str | None = None, target_revision: str = "-1"
) -> None:
"""Downgrade database to a previous revision.
Use with caution in production. Always backup data before downgrading.
Args:
database_url: SQLAlchemy async database URL
target_revision: Target revision to downgrade to. Defaults to "-1" (previous).
Can be specific revision ID, relative revision, or "base" for complete
rollback.
Raises:
FileNotFoundError: If alembic.ini or migration scripts not found
ValueError: If target revision is invalid
Exception: If downgrade fails
Example::
# Downgrade one revision
await downgrade_migrations("sqlite+aiosqlite:///./db.sqlite")
# Downgrade to specific revision
await downgrade_migrations("sqlite+aiosqlite:///./db.sqlite", "58c0fadfa0ca")
# Rollback all migrations
await downgrade_migrations("sqlite+aiosqlite:///./db.sqlite", "base")
"""
config = _get_alembic_config(database_url)
await asyncio.get_event_loop().run_in_executor(None, command.downgrade, config, target_revision)
[docs]
async def get_current_revision(database_url: str | None = None) -> str | None:
"""Get the current migration revision of the database.
Args:
database_url: SQLAlchemy async database URL
Returns:
Current revision ID, or None if database is uninitialized
Example::
revision = await get_current_revision("sqlite+aiosqlite:///./db.sqlite")
if revision:
print(f"Database at revision: {revision}")
else:
print("Database not initialized")
"""
config = _get_alembic_config(database_url)
url = config.get_main_option("sqlalchemy.url")
if not url:
raise ValueError("No database URL configured")
# Create async engine
engine: AsyncEngine = create_async_engine(url, echo=False)
try:
async with engine.connect() as connection:
def get_revision(conn):
context = MigrationContext.configure(conn)
return context.get_current_revision()
revision = await connection.run_sync(get_revision)
return revision
finally:
await engine.dispose()
[docs]
async def get_head_revision(database_url: str | None = None) -> str:
"""Get the latest (head) migration revision available.
Args:
database_url: SQLAlchemy async database URL (used to get config)
Returns:
Head revision ID
Example::
head = await get_head_revision()
current = await get_current_revision()
if head != current:
print(f"Migrations pending: {current} -> {head}")
"""
config = _get_alembic_config(database_url)
script_dir = ScriptDirectory.from_config(config)
head = script_dir.get_current_head()
if not head:
raise ValueError("No migrations found in alembic/versions/")
return head
[docs]
async def check_pending_migrations(database_url: str | None = None) -> bool:
"""Check if there are pending migrations that need to be applied.
This is useful for CI/CD pipelines and application health checks.
Args:
database_url: SQLAlchemy async database URL
Returns:
True if pending migrations exist, False otherwise
Example::
if await check_pending_migrations("sqlite+aiosqlite:///./db.sqlite"):
print("WARNING: Pending migrations detected!")
await run_migrations()
"""
current = await get_current_revision(database_url)
head = await get_head_revision(database_url)
# If current is None, database is uninitialized (pending migrations)
if current is None:
return True
return current != head
[docs]
async def initialize_database(database_url: str | None = None) -> None:
"""Initialize a new database with the latest schema.
This is a convenience function that creates all tables at the current
head revision without running individual migrations. Equivalent to
running all migrations from scratch.
Args:
database_url: SQLAlchemy async database URL
Raises:
Exception: If database already has schema or initialization fails
Example::
# Initialize new database
await initialize_database("sqlite+aiosqlite:///./newdb.sqlite")
"""
current = await get_current_revision(database_url)
if current is not None:
raise ValueError(
f"Database already initialized at revision {current}. Use run_migrations() to upgrade."
)
await run_migrations(database_url, "head")
[docs]
async def get_migration_history(
database_url: str | None = None,
) -> list[dict[str, str | None]]:
"""Get the migration history showing all applied migrations.
Args:
database_url: SQLAlchemy async database URL
Returns:
list[dict[str, str | None]]: Migration records with keys revision, down_revision, description.
Example::
history = await get_migration_history()
for migration in history:
print(f"{migration['revision']}: {migration['description']}")
"""
config = _get_alembic_config(database_url)
script_dir = ScriptDirectory.from_config(config)
history = []
for revision in script_dir.walk_revisions("base", "head"):
history.append(
{
"revision": revision.revision,
"down_revision": revision.down_revision,
"description": revision.doc,
}
)
# Reverse to show oldest first
return list(reversed(history))
[docs]
async def stamp_revision(database_url: str | None = None, revision: str = "head") -> None:
"""Stamp the database with a specific revision without running migrations.
This is useful when importing an existing database that matches a specific
schema version, or for recovering from migration issues.
**Use with extreme caution!** This bypasses actual schema changes.
Args:
database_url: SQLAlchemy async database URL
revision: Revision to stamp database with
Example::
# Stamp database as being at head revision
await stamp_revision("sqlite+aiosqlite:///./db.sqlite", "head")
"""
config = _get_alembic_config(database_url)
await asyncio.get_event_loop().run_in_executor(None, command.stamp, config, revision)