Airflow Scheduler 内存不足问题

Airflow Scheduler out of memory problems

我们正在试验 Apache Airflow(版本 1.10rc2,python 2.7)并将其部署到不同 pods 的 kubernetes、网络服务器和调度程序,并且数据库也在使用云 sql,但我们一直面临调度程序 pod 内存不足的问题。

在 OOM 发生时,我们 运行 只有 4 个示例 Dag(大约 20 个任务)。 pod 的内存为 1Gib。我在其他帖子中看到,当 运行 时,任务可能会消耗大约 50Mib 的内存,并且所有任务操作都在内存中,没有任何内容被刷新到磁盘,所以已经有 1Gb 了。

我们是否可以使用任何经验法则来计算基于并行任务的调度程序需要多少内存?

除了降低并行度之外,是否可以进行任何调整以减少调度程序本身的内存使用?

我认为我们的用例不需要 Dask 或 Celery 来水平扩展 Airflow,为工人提供更多机器。

有关配置的更多详细信息:

executor = Localexecutor
parallelism = 10
dag_concurrency = 5
max_active_runs_per_dag = 2
workers = 1
worker_concurrency = 16
min_file_process_interval = 1
min_file_parsing_loop_time = 5
dag_dir_list_interval = 30

当时的dag运行有example_bash_operator、example_branch_operator、example_python_operator和我们开发的一个quickDag。

在某些情况下,它们都只是简单的任务/操作符,如 DummyOperators、BranchOperatos、BashOperators,但只做 echo 或 sleep,PythonOperators 也只做 sleep。总共大约有 40 个任务,但并非所有任务都是 运行 并行的,因为其中一些是下游、依赖关系等,我们的并行度设置为 10,如上所述只有一个 worker ,并且 dag_concurrency 设置为 5。

我在气流日志中看不到任何异常,在任务日志中也看不到。

运行 只是其中之一,似乎气流正在相应地工作。

我可以在调度程序 pod 中看到很多调度程序进程,每个进程使用 0.2% 或更多的内存:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
461384 airflow 20 0 836700 127212 23908 S 36.5 0.4 0:01.19 /usr/bin/python /usr/bin/airflow scheduler 461397 airflow 20 0 356168 86320 5044 R 14.0 0.3 0:00.42 /usr/bin/python /usr/bin/airflow scheduler 44 airflow 20 0 335920 71700 10600 S 28.9 0.2 403:32.05 /usr/bin/python /usr/bin/airflow scheduler 56 airflow 20 0 330548 59164 3524 S 0.0 0.2 0:00.02

这是 运行 使用 0.3% 内存的任务之一:

PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND 462042 airflow 20 0 282632 91120 10544 S 1.7 0.3 0:02.66 /usr/bin/python /usr/bin/airflow run example_bash_operator runme_1 2018-08-29T07:39:48.193735+00:00 --local -sd /usr/lib/python2.7/site-packages/apache_airflow-1.10.0-py2.7.egg/airflow/example_dags/example_bash_operator.py

实际上并没有一个简明的经验法则可以遵循,因为它可能会根据您的工作流程而有很大差异。

如您所见,调度程序将创建多个派生进程。此外,每个任务(Dummy 除外)都将 运行 在其自己的进程中。根据运算符和它正在处理的数据,每个任务所需的内存量可能会有很大差异。

并行度设置将直接限制在所有 dag runs/tasks 上同时 运行 的任务数量,这对使用 LocalExecutor 的您来说效果最为显着。您也可以尝试将 [scheduler] 下的 max_threads 设置为 1.

因此,一个(非常)一般的经验法则就是善待资源:

[256 for scheduler itself] + ( [parallelism] * (100MB + [size of data you'll process]) )

根据您是加载完整数据集,还是在任务执行过程中处理数据块,数据大小需要更改的位置。

即使您认为不需要扩展集群,我仍然建议使用 CeleryExecutor,如果只是为了将调度程序和任务彼此隔离的话。这样,如果您的调度程序或 celery worker 死了,它不会同时关闭。特别是在 k8 中的 运行ning,如果你的调度程序 sigterms 它将与任何 运行ning 任务一起被杀死。如果你 运行 它们在不同的 pods 并且调度程序 pod 重新启动,你的任务可以不间断地完成。如果你有更多的工人,它会减少来自其他任务的 memory/processing 峰值的影响。