使用来自 Vault 后端的配置后,Airflow 部署卡住了
Airflow deployment stuck after using config from Vault backend
我在 Kubernetes 上有一个 Airflow 应用程序 运行ning,它使用 Vault 作为秘密后端。最近我设法将我的配置值 sql_alchemy_conn
移动到 Vault,因为它包含用户的密码。
我可以看到它正在从秘密后端获取值并能够连接到数据库和 运行 迁移作业。
但从那以后我无法部署应用程序的其余部分,因为所有其他资源都卡在初始容器中 wait-for-airflow-migrations
。
我正在使用官方 helm chart 来部署应用程序,这是用于检查迁移是否 运行
的 python 代码
import airflow
import logging
import os
import time
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from airflow import settings
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)
timeout=60
with settings.engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info('Waiting for migrations... %s second(s)', ticker)
这是 wait-for-migration
容器的日志
[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.
我做了一些调试,发现它使用的是配置文件中的默认 sql_alchemy_conn
值,该值基本上与 sqlite 引擎一起使用。
我应该更新此脚本并从 Vault 后端手动获取值,还是我缺少某些配置?
我通过手动从 Vault 后端获取配置值并创建引擎而不是使用 settings.engine.connect()
使其工作(目前)。这是我的 wait-for-migration-command
模板
{{ define "wait-for-migrations-command" }}
{{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
- python
- -c
- |
import airflow
import logging
import os
import time
from airflow.providers.hashicorp.secrets.vault import VaultBackend
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy.engine import create_engine
try:
sql_alchemy_conn = VaultBackend(
config_path="applications/secrets/airflow/config/",
auth_type="kubernetes",
kubernetes_role="kubernetes_role",
).get_config(key="sql_alchemy_conn")
except Exception:
raise
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, "migrations")
config = Config(os.path.join(package_dir, "alembic.ini"))
config.set_main_option("script_location", directory)
config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
script_ = ScriptDirectory.from_config(config)
engine = create_engine(sql_alchemy_conn)
timeout = 60
with engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info("Waiting for migrations... %s second(s)", ticker)
{{- end }}
我相信一定有其他方法可以解决这个问题。会继续找的。
如果您使用的是 Airflow 2+,您应该能够使用:
- airflow
- db
- check-migrations
- --migration-wait-timeout={{ .Values.images.migrationsWaitTimeout }}
这是图表中 Airflow 2+ 迁移的默认命令,我相信它在您的代码中也应该能正常工作。
(另请参阅您粘贴的文件顶部的评论:
/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */
我在 Kubernetes 上有一个 Airflow 应用程序 运行ning,它使用 Vault 作为秘密后端。最近我设法将我的配置值 sql_alchemy_conn
移动到 Vault,因为它包含用户的密码。
我可以看到它正在从秘密后端获取值并能够连接到数据库和 运行 迁移作业。
但从那以后我无法部署应用程序的其余部分,因为所有其他资源都卡在初始容器中 wait-for-airflow-migrations
。
我正在使用官方 helm chart 来部署应用程序,这是用于检查迁移是否 运行
import airflow
import logging
import os
import time
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from airflow import settings
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)
timeout=60
with settings.engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info('Waiting for migrations... %s second(s)', ticker)
这是 wait-for-migration
容器的日志
[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.
我做了一些调试,发现它使用的是配置文件中的默认 sql_alchemy_conn
值,该值基本上与 sqlite 引擎一起使用。
我应该更新此脚本并从 Vault 后端手动获取值,还是我缺少某些配置?
我通过手动从 Vault 后端获取配置值并创建引擎而不是使用 settings.engine.connect()
使其工作(目前)。这是我的 wait-for-migration-command
模板
{{ define "wait-for-migrations-command" }}
{{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
- python
- -c
- |
import airflow
import logging
import os
import time
from airflow.providers.hashicorp.secrets.vault import VaultBackend
from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory
from sqlalchemy.engine import create_engine
try:
sql_alchemy_conn = VaultBackend(
config_path="applications/secrets/airflow/config/",
auth_type="kubernetes",
kubernetes_role="kubernetes_role",
).get_config(key="sql_alchemy_conn")
except Exception:
raise
package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, "migrations")
config = Config(os.path.join(package_dir, "alembic.ini"))
config.set_main_option("script_location", directory)
config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
script_ = ScriptDirectory.from_config(config)
engine = create_engine(sql_alchemy_conn)
timeout = 60
with engine.connect() as connection:
context = MigrationContext.configure(connection)
ticker = 0
while True:
source_heads = set(script_.get_heads())
db_heads = set(context.get_current_heads())
if source_heads == db_heads:
break
if ticker >= timeout:
raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
ticker += 1
time.sleep(1)
logging.info("Waiting for migrations... %s second(s)", ticker)
{{- end }}
我相信一定有其他方法可以解决这个问题。会继续找的。
如果您使用的是 Airflow 2+,您应该能够使用:
- airflow
- db
- check-migrations
- --migration-wait-timeout={{ .Values.images.migrationsWaitTimeout }}
这是图表中 Airflow 2+ 迁移的默认命令,我相信它在您的代码中也应该能正常工作。
(另请参阅您粘贴的文件顶部的评论:
/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */