Airflow 如何配置为使用 Snowflake 作为后端?
How can Airflow be configured to use Snowflake as a backend?
我正在尝试将 Apache Airflow 配置为使用 Snowflake 作为后端数据库。理论上它应该开箱即用,因为它使用 SQLAlchemy 作为支持 Snowflake 的 ORM。
我已成功连接到我们的 Snowflake 帐户,确认 SqlAlchemy 可与 Snowflake 一起使用。
我已经使用 alldb 选项安装了 Airflow
sudo pip install apache-airflow[alldbs]
并且在 airflow.cfg 文件中,我已将 sql_alchemy_conn 设置设置为 SqlAlchemy 连接字符串,该字符串在 SqlAlchemy 的 create_engine() 调用的手动测试中起作用。
我在 运行 airflow initdb
时收到以下错误消息
[2019-06-20 14:08:28,268] {__init__.py:51} INFO - Using executor LocalExecutor
DB: snowflake://MYUSER:***@myaccount.us-east-1/MYDATABASE/AIRFLOW?warehouse=LOAD_WH
[2019-06-20 14:08:28,756] {db.py:350} INFO - Creating tables
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 32, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 1096, in initdb
db.initdb(settings.RBAC)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 91, in initdb
upgradedb()
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 358, in upgradedb
command.upgrade(config, 'heads')
File "/usr/local/lib/python2.7/dist-packages/alembic/command.py", line 254, in upgrade
script.run_env()
File "/usr/local/lib/python2.7/dist-packages/alembic/script/base.py", line 427, in run_env
util.load_python_file(self.dir, 'env.py')
File "/usr/local/lib/python2.7/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python2.7/dist-packages/alembic/util/compat.py", line 141, in load_module_py
mod = imp.load_source(module_id, path, fp)
File "/usr/local/lib/python2.7/dist-packages/airflow/migrations/env.py", line 92, in <module>
run_migrations_online()
File "/usr/local/lib/python2.7/dist-packages/airflow/migrations/env.py", line 82, in run_migrations_online
compare_type=COMPARE_TYPE,
File "<string>", line 8, in configure
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/environment.py", line 812, in configure
opts=opts
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/migration.py", line 172, in configure
return MigrationContext(dialect, connection, opts, environment_context)
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/migration.py", line 111, in __init__
self.impl = ddl.DefaultImpl.get_by_dialect(dialect)(
File "/usr/local/lib/python2.7/dist-packages/alembic/ddl/impl.py", line 65, in get_by_dialect
return _impls[dialect.name]
KeyError: 'snowflake'```
在我的例子中,我修复了将此代码添加到 env.py
以便 alembic 可以识别 Snowflake 驱动程序
from alembic.ddl.impl import DefaultImpl
class SnowflakeImpl(DefaultImpl):
__dialect__ = 'snowflake'
参考:https://docs.snowflake.net/manuals/user-guide/sqlalchemy.html#alembic-support
我正在尝试将 Apache Airflow 配置为使用 Snowflake 作为后端数据库。理论上它应该开箱即用,因为它使用 SQLAlchemy 作为支持 Snowflake 的 ORM。
我已成功连接到我们的 Snowflake 帐户,确认 SqlAlchemy 可与 Snowflake 一起使用。
我已经使用 alldb 选项安装了 Airflow
sudo pip install apache-airflow[alldbs]
并且在 airflow.cfg 文件中,我已将 sql_alchemy_conn 设置设置为 SqlAlchemy 连接字符串,该字符串在 SqlAlchemy 的 create_engine() 调用的手动测试中起作用。
我在 运行 airflow initdb
[2019-06-20 14:08:28,268] {__init__.py:51} INFO - Using executor LocalExecutor
DB: snowflake://MYUSER:***@myaccount.us-east-1/MYDATABASE/AIRFLOW?warehouse=LOAD_WH
[2019-06-20 14:08:28,756] {db.py:350} INFO - Creating tables
Traceback (most recent call last):
File "/usr/local/bin/airflow", line 32, in <module>
args.func(args)
File "/usr/local/lib/python2.7/dist-packages/airflow/bin/cli.py", line 1096, in initdb
db.initdb(settings.RBAC)
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 91, in initdb
upgradedb()
File "/usr/local/lib/python2.7/dist-packages/airflow/utils/db.py", line 358, in upgradedb
command.upgrade(config, 'heads')
File "/usr/local/lib/python2.7/dist-packages/alembic/command.py", line 254, in upgrade
script.run_env()
File "/usr/local/lib/python2.7/dist-packages/alembic/script/base.py", line 427, in run_env
util.load_python_file(self.dir, 'env.py')
File "/usr/local/lib/python2.7/dist-packages/alembic/util/pyfiles.py", line 81, in load_python_file
module = load_module_py(module_id, path)
File "/usr/local/lib/python2.7/dist-packages/alembic/util/compat.py", line 141, in load_module_py
mod = imp.load_source(module_id, path, fp)
File "/usr/local/lib/python2.7/dist-packages/airflow/migrations/env.py", line 92, in <module>
run_migrations_online()
File "/usr/local/lib/python2.7/dist-packages/airflow/migrations/env.py", line 82, in run_migrations_online
compare_type=COMPARE_TYPE,
File "<string>", line 8, in configure
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/environment.py", line 812, in configure
opts=opts
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/migration.py", line 172, in configure
return MigrationContext(dialect, connection, opts, environment_context)
File "/usr/local/lib/python2.7/dist-packages/alembic/runtime/migration.py", line 111, in __init__
self.impl = ddl.DefaultImpl.get_by_dialect(dialect)(
File "/usr/local/lib/python2.7/dist-packages/alembic/ddl/impl.py", line 65, in get_by_dialect
return _impls[dialect.name]
KeyError: 'snowflake'```
在我的例子中,我修复了将此代码添加到 env.py
以便 alembic 可以识别 Snowflake 驱动程序
from alembic.ddl.impl import DefaultImpl
class SnowflakeImpl(DefaultImpl):
__dialect__ = 'snowflake'
参考:https://docs.snowflake.net/manuals/user-guide/sqlalchemy.html#alembic-support