使用来自 Vault 后端的配置后,Airflow 部署卡住了

Airflow deployment stuck after using config from Vault backend

我在 Kubernetes 上有一个 Airflow 应用程序 运行ning,它使用 Vault 作为秘密后端。最近我设法将我的配置值 sql_alchemy_conn 移动到 Vault,因为它包含用户的密码。 我可以看到它正在从秘密后端获取值并能够连接到数据库和 运行 迁移作业。

但从那以后我无法部署应用程序的其余部分,因为所有其他资源都卡在初始容器中 wait-for-airflow-migrations。 我正在使用官方 helm chart 来部署应用程序,这是用于检查迁移是否 运行

的 python 代码
import airflow
import logging
import os
import time

from alembic.config import Config
from alembic.runtime.migration import MigrationContext
from alembic.script import ScriptDirectory

from airflow import settings

package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
directory = os.path.join(package_dir, 'migrations')
config = Config(os.path.join(package_dir, 'alembic.ini'))
config.set_main_option('script_location', directory)
config.set_main_option('sqlalchemy.url', settings.SQL_ALCHEMY_CONN.replace('%', '%%'))
script_ = ScriptDirectory.from_config(config)

timeout=60

with settings.engine.connect() as connection:
    context = MigrationContext.configure(connection)
    ticker = 0
    while True:
        source_heads = set(script_.get_heads())

        db_heads = set(context.get_current_heads())
        if source_heads == db_heads:
            break

        if ticker >= timeout:
            raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
        ticker += 1
        time.sleep(1)
        logging.info('Waiting for migrations... %s second(s)', ticker)

这是 wait-for-migration 容器的日志

[2021-07-29 12:01:13,650] {migration.py:164} INFO - Context impl SQLiteImpl.
[2021-07-29 12:01:13,650] {migration.py:171} INFO - Will assume non-transactional DDL.
[2021-07-29 12:01:20,592] {<string>:49} INFO - Waiting for migrations... 1 second(s)
[2021-07-29 12:01:21,593] {<string>:49} INFO - Waiting for migrations... 2 second(s)
[2021-07-29 12:01:22,595] {<string>:49} INFO - Waiting for migrations... 3 second(s)
[2021-07-29 12:01:23,596] {<string>:49} INFO - Waiting for migrations... 4 second(s)
[2021-07-29 12:01:24,597] {<string>:49} INFO - Waiting for migrations... 5 second(s)
[2021-07-29 12:01:25,598] {<string>:49} INFO - Waiting for migrations... 6 second(s)
[2021-07-29 12:01:26,600] {<string>:49} INFO - Waiting for migrations... 7 second(s)
[2021-07-29 12:01:27,601] {<string>:49} INFO - Waiting for migrations... 8 second(s)
[2021-07-29 12:01:28,602] {<string>:49} INFO - Waiting for migrations... 9 second(s)
[2021-07-29 12:01:29,603] {<string>:49} INFO - Waiting for migrations... 10 second(s)
[2021-07-29 12:01:30,604] {<string>:49} INFO - Waiting for migrations... 11 second(s)
[2021-07-29 12:01:31,606] {<string>:49} INFO - Waiting for migrations... 12 second(s)
[2021-07-29 12:01:32,607] {<string>:49} INFO - Waiting for migrations... 13 second(s)
[2021-07-29 12:01:33,608] {<string>:49} INFO - Waiting for migrations... 14 second(s)
[2021-07-29 12:01:34,610] {<string>:49} INFO - Waiting for migrations... 15 second(s)
[2021-07-29 12:01:35,611] {<string>:49} INFO - Waiting for migrations... 16 second(s)
[2021-07-29 12:01:36,612] {<string>:49} INFO - Waiting for migrations... 17 second(s)
[2021-07-29 12:01:37,613] {<string>:49} INFO - Waiting for migrations... 18 second(s)
[2021-07-29 12:01:38,614] {<string>:49} INFO - Waiting for migrations... 19 second(s)
[2021-07-29 12:01:39,615] {<string>:49} INFO - Waiting for migrations... 20 second(s)
[2021-07-29 12:01:40,616] {<string>:49} INFO - Waiting for migrations... 21 second(s)
[2021-07-29 12:01:41,617] {<string>:49} INFO - Waiting for migrations... 22 second(s)
[2021-07-29 12:01:42,618] {<string>:49} INFO - Waiting for migrations... 23 second(s)
[2021-07-29 12:01:43,619] {<string>:49} INFO - Waiting for migrations... 24 second(s)
[2021-07-29 12:01:44,621] {<string>:49} INFO - Waiting for migrations... 25 second(s)
[2021-07-29 12:01:45,622] {<string>:49} INFO - Waiting for migrations... 26 second(s)
[2021-07-29 12:01:46,623] {<string>:49} INFO - Waiting for migrations... 27 second(s)
[2021-07-29 12:01:47,625] {<string>:49} INFO - Waiting for migrations... 28 second(s)
[2021-07-29 12:01:48,626] {<string>:49} INFO - Waiting for migrations... 29 second(s)
[2021-07-29 12:01:49,628] {<string>:49} INFO - Waiting for migrations... 30 second(s)
[2021-07-29 12:01:50,628] {<string>:49} INFO - Waiting for migrations... 31 second(s)
[2021-07-29 12:01:51,630] {<string>:49} INFO - Waiting for migrations... 32 second(s)
[2021-07-29 12:01:52,631] {<string>:49} INFO - Waiting for migrations... 33 second(s)
[2021-07-29 12:01:53,632] {<string>:49} INFO - Waiting for migrations... 34 second(s)
[2021-07-29 12:01:54,634] {<string>:49} INFO - Waiting for migrations... 35 second(s)
[2021-07-29 12:01:55,635] {<string>:49} INFO - Waiting for migrations... 36 second(s)
[2021-07-29 12:01:56,636] {<string>:49} INFO - Waiting for migrations... 37 second(s)
[2021-07-29 12:01:57,637] {<string>:49} INFO - Waiting for migrations... 38 second(s)
[2021-07-29 12:01:58,638] {<string>:49} INFO - Waiting for migrations... 39 second(s)
[2021-07-29 12:01:59,639] {<string>:49} INFO - Waiting for migrations... 40 second(s)
[2021-07-29 12:02:00,641] {<string>:49} INFO - Waiting for migrations... 41 second(s)
[2021-07-29 12:02:01,641] {<string>:49} INFO - Waiting for migrations... 42 second(s)
[2021-07-29 12:02:02,642] {<string>:49} INFO - Waiting for migrations... 43 second(s)
[2021-07-29 12:02:03,644] {<string>:49} INFO - Waiting for migrations... 44 second(s)
[2021-07-29 12:02:04,644] {<string>:49} INFO - Waiting for migrations... 45 second(s)
[2021-07-29 12:02:05,646] {<string>:49} INFO - Waiting for migrations... 46 second(s)
[2021-07-29 12:02:06,647] {<string>:49} INFO - Waiting for migrations... 47 second(s)
[2021-07-29 12:02:07,648] {<string>:49} INFO - Waiting for migrations... 48 second(s)
[2021-07-29 12:02:08,649] {<string>:49} INFO - Waiting for migrations... 49 second(s)
[2021-07-29 12:02:09,651] {<string>:49} INFO - Waiting for migrations... 50 second(s)
[2021-07-29 12:02:10,651] {<string>:49} INFO - Waiting for migrations... 51 second(s)
[2021-07-29 12:02:11,652] {<string>:49} INFO - Waiting for migrations... 52 second(s)
[2021-07-29 12:02:12,654] {<string>:49} INFO - Waiting for migrations... 53 second(s)
[2021-07-29 12:02:13,655] {<string>:49} INFO - Waiting for migrations... 54 second(s)
[2021-07-29 12:02:14,656] {<string>:49} INFO - Waiting for migrations... 55 second(s)
[2021-07-29 12:02:15,657] {<string>:49} INFO - Waiting for migrations... 56 second(s)
[2021-07-29 12:02:16,659] {<string>:49} INFO - Waiting for migrations... 57 second(s)
[2021-07-29 12:02:17,659] {<string>:49} INFO - Waiting for migrations... 58 second(s)
[2021-07-29 12:02:18,660] {<string>:49} INFO - Waiting for migrations... 59 second(s)
[2021-07-29 12:02:19,662] {<string>:49} INFO - Waiting for migrations... 60 second(s)
Traceback (most recent call last):
  File "<string>", line 46, in <module>
TimeoutError: There are still unapplied migrations after 60 seconds.

我做了一些调试,发现它使用的是配置文件中的默认 sql_alchemy_conn 值,该值基本上与 sqlite 引擎一起使用。

我应该更新此脚本并从 Vault 后端手动获取值,还是我缺少某些配置?

我通过手动从 Vault 后端获取配置值并创建引擎而不是使用 settings.engine.connect() 使其工作(目前)。这是我的 wait-for-migration-command 模板

{{ define "wait-for-migrations-command" }}
  {{/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */}}
  - python
  - -c
  - |
        import airflow
        import logging
        import os
        import time

        from airflow.providers.hashicorp.secrets.vault import VaultBackend
        from alembic.config import Config
        from alembic.runtime.migration import MigrationContext
        from alembic.script import ScriptDirectory
        from sqlalchemy.engine import create_engine

        try:
            sql_alchemy_conn = VaultBackend(
                config_path="applications/secrets/airflow/config/",
                auth_type="kubernetes",
                kubernetes_role="kubernetes_role",
            ).get_config(key="sql_alchemy_conn")
        except Exception:
            raise

        package_dir = os.path.abspath(os.path.dirname(airflow.__file__))
        directory = os.path.join(package_dir, "migrations")
        config = Config(os.path.join(package_dir, "alembic.ini"))
        config.set_main_option("script_location", directory)
        config.set_main_option("sqlalchemy.url", sql_alchemy_conn)
        script_ = ScriptDirectory.from_config(config)

        engine = create_engine(sql_alchemy_conn)

        timeout = 60
        with engine.connect() as connection:
            context = MigrationContext.configure(connection)
            ticker = 0
            while True:
                source_heads = set(script_.get_heads())

                db_heads = set(context.get_current_heads())
                if source_heads == db_heads:
                    break

                if ticker >= timeout:
                    raise TimeoutError("There are still unapplied migrations after {} seconds.".format(ticker))
                ticker += 1
                time.sleep(1)
                logging.info("Waiting for migrations... %s second(s)", ticker)
{{- end }}

我相信一定有其他方法可以解决这个问题。会继续找的。

如果您使用的是 Airflow 2+,您应该能够使用:

  - airflow
  - db
  - check-migrations
  - --migration-wait-timeout={{ .Values.images.migrationsWaitTimeout }}

参见:https://github.com/apache/airflow/blob/1728e35e00fa19a329af8d5d45a764b334020450/chart/templates/_helpers.yaml#L562

这是图表中 Airflow 2+ 迁移的默认命令,我相信它在您的代码中也应该能正常工作。

(另请参阅您粘贴的文件顶部的评论:

/* From Airflow 2.0.0 this can become [airflow, db, check-migrations] */