Airflow Scheduler 活性探针崩溃(2.0 版)

Airflow Scheduler liveness probe crashing (version 2.0)

我刚刚将 Airflow 从 1.10.13 升级到 2.0。我 运行 在 Kubernetes (AKS Azure) 中使用 Kubernetes Executor。不幸的是,由于 Liveness 探测失败,我发现我的调度程序每 15-20 分钟就会被杀死。因此我的 pod 不断重启。

我在 1.10.13 中没有问题。

这是我的 Liveness 探测器:

import os
os.environ['AIRFLOW__CORE__LOGGING_LEVEL'] = 'ERROR'
os.environ['AIRFLOW__LOGGING__LOGGING_LEVEL'] = 'ERROR'

from airflow.jobs.scheduler_job import SchedulerJob
from airflow.utils.db import create_session
from airflow.utils.net import get_hostname
import sys

with create_session() as session:
  job = session.query(SchedulerJob).filter_by(hostname=get_hostname()).order_by(
      SchedulerJob.latest_heartbeat.desc()).limit(1).first()

sys.exit(0 if job.is_alive() else 1)

当我查看调度程序日志时,我看到以下内容:

[2021-02-16 12:18:21,883] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor489-Process' pid=12812 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:22,228] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,232] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,232] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,232] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,232] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,233] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,233] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,234] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,236] {dag_processing.py:383} DEBUG - Received message of type DagParsingStat
[2021-02-16 12:18:22,246] {scheduler_job.py:1390} DEBUG - Next timed event is in 0.143059
[2021-02-16 12:18:22,246] {scheduler_job.py:1392} DEBUG - Ran scheduling loop in 0.05 seconds
[2021-02-16 12:18:22,422] {scheduler_job.py:933} DEBUG - No tasks to consider for execution.
[2021-02-16 12:18:22,426] {base_executor.py:147} DEBUG - 0 running task instances
[2021-02-16 12:18:22,426] {base_executor.py:148} DEBUG - 0 in queue
[2021-02-16 12:18:22,426] {base_executor.py:149} DEBUG - 32 open slots
[2021-02-16 12:18:22,427] {base_executor.py:158} DEBUG - Calling the <class 'airflow.executors.kubernetes_executor.KubernetesExecutor'> sync method
[2021-02-16 12:18:22,427] {kubernetes_executor.py:337} DEBUG - Syncing KubernetesExecutor
[2021-02-16 12:18:22,427] {kubernetes_executor.py:263} DEBUG - KubeJobWatcher alive, continuing
[2021-02-16 12:18:22,439] {scheduler_job.py:1751} INFO - Resetting orphaned tasks for active dag runs
[2021-02-16 12:18:22,452] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12819)
[2021-02-16 12:18:22,460] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor490-Process' pid=12819 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,009] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12826)
[2021-02-16 12:18:23,017] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor491-Process' pid=12826 parent=9286 stopped exitcode=0>
[2021-02-16 12:18:23,594] {settings.py:290} DEBUG - Disposing DB connection pool (PID 12833)

... Many of these Disposing DB connection pool entries here

[2021-02-16 12:20:08,212] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor675-Process' pid=14146 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:08,916] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14153)
[2021-02-16 12:20:08,924] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor676-Process' pid=14153 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:09,475] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14160)
[2021-02-16 12:20:09,484] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor677-Process' pid=14160 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,044] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14167)
[2021-02-16 12:20:10,053] {scheduler_job.py:309} DEBUG - Waiting for <ForkProcess name='DagFileProcessor678-Process' pid=14167 parent=9286 stopped exitcode=0>
[2021-02-16 12:20:10,610] {settings.py:290} DEBUG - Disposing DB connection pool (PID 14180)
[2021-02-16 12:23:42,287] {scheduler_job.py:746} INFO - Exiting gracefully upon receiving signal 15
[2021-02-16 12:23:43,290] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,494] {process_utils.py:201} INFO - Waiting up to 5 seconds for processes to exit...
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=14180, status='terminated', started='12:20:09') (14180) terminated with exit code None
[2021-02-16 12:23:43,503] {process_utils.py:61} INFO - Process psutil.Process(pid=9286, status='terminated', exitcode=0, started='12:13:35') (9286) terminated with exit code 0
[2021-02-16 12:23:43,506] {process_utils.py:95} INFO - Sending Signals.SIGTERM to GPID 9286
[2021-02-16 12:23:43,506] {scheduler_job.py:1296} INFO - Exited execute loop
[2021-02-16 12:23:43,523] {cli_action_loggers.py:84} DEBUG - Calling callbacks: []
[2021-02-16 12:23:43,525] {settings.py:290} DEBUG - Disposing DB connection pool (PID 7)

我通过设置以下配置设法修复了重启问题:

[kubernetes]
...
delete_option_kwargs = {"grace_period_seconds": 10}
enable_tcp_keepalive = True
tcp_keep_idle = 30
tcp_keep_intvl = 30
tcp_keep_cnt = 30

我在 AWS - Kubernetes 中有另一个 Airflow 实例 运行。那一个在任何版本上都运行良好,我意识到问题出在 Azure Kubernetes 上,其余 api 调用 api 服务器。

以防万一这对其他人有帮助....

对于我的案例,问题出在 工人 身上。其中有数据库连接问题。修复它也解决了调度程序的问题。

注意:同时检查工人日志。