SQLAlchemy + alembic:创建模式迁移

SQLAlchemy + alembic: create schema migration

我不确定如何定义 create schema foo 迁移?我的模型看起来像这样(我正在使用 Flask-Migrate):

class MyTable(db.Model):
    __tablename__ = "my_table"
    __table_args__ = {"schema": "foo"}

    id = ColumnPrimKey()
    name = Column(Text, unique=True, nullable=False)

当我执行 mange db upgrade 时失败,因为架构 "foo" 不存在。如何使用 SQLAlchemy 和 Alembic 为架构添加迁移?

我通过将迁移 upgrade 命令修改为第一个 运行:

来完成此操作
op.execute("create schema foo")

并且在downgrade函数中

op.execute("drop schema foo")

所以整个迁移文件看起来像这样:

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '6c82c972c61e'
down_revision = '553260b3e828'
branch_labels = None
depends_on = None


def upgrade():
    op.execute("create schema foo")
    ...

def downgrade():
    ...
    op.execute("drop schema foo")

另一种选择是添加以下函数来修改 env.py 中的 MigrationScript 指令:

from alembic import operations

def process_revision_directives(context, revision, directives):
    """Modify the MigrationScript directives to create schemata as required.
    """
    script = directives[0]
    for schema in frozenset(i.schema for i in target_metadata.tables.values()):
        script.upgrade_ops.ops.insert(
            0, operations.ops.ExecuteSQLOp(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
        script.downgrade_ops.ops.append(
            operations.ops.ExecuteSQLOp(f"DROP SCHEMA IF EXISTS {schema} RESTRICT"))

然后在context.configure中添加process_revision_directives=process_revision_directives

已接受答案的一个潜在问题是,对于初始迁移,Alembic 可能无法找到创建 alembic_version 的位置。这是因为 op.execute("create schema foo") 仅在 之后执行 Alembic 尝试找到它的 alembic_version table。错误弹出为:

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.InvalidSchemaName) schema "foo" does not exist

最简单的方法是让 alembic_version table 存在于另一个模式中,将 version_table_schema 传递给 context.configure() (docs)。

但是,在许多情况下,人们可能希望同时 (i) 作为初始迁移的一部分从头开始创建模式(例如,设置测试环境)和 (ii) alembic_version table 在目标架构中。在这些情况下,另一种方法是将模式创建委托给 env.py。示例:

# env.py
from sqlalchemy import create_engine

from bar import create_database_with_schema_if_not_exists

SQLALCHEMY_DATABASE_URI = ...
schema = "foo"
engine = create_engine(SQLALCHEMY_DATABASE_URI)
create_database_with_schema_if_not_exists(engine, schema)
...
# bar.py
import sqlalchemy
from sqlalchemy_utils import create_database, database_exists

def create_database_with_schema_if_not_exists(engine, schema):
    if not database_exists(engine.url):
        create_database(engine.url)
    if not engine.dialect.has_schema(engine, schema):
        engine.execute(sqlalchemy.schema.CreateSchema(schema))