气流健康检查

airflow health check

我用的airflow,有时候pipeline等的时间比较长。也有工作 运行 时间过长的情况(大概占用了其他工作的资源)

我正在尝试研究如何以编程方式识别调度程序的健康状况,并在未来无需任何额外框架的情况下监控这些调度程序。我开始查看元数据数据库表。我现在能想到的就是从任务的dag_runduration中看到start_dateend_date。我应该关注的其他指标是什么?非常感谢您的帮助。

无需“深入”数据库。

Airflow 为您提供可用于特定目的的指标:https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html

如果向下滚动,您会看到所有有用的指标,其中一些正是您要查找的指标(尤其是计时器)。

这可以通过通常的指标集成来完成。 Airflow 通过 statsd 发布指标,Airflow Official Helm Chart (https://airflow.apache.org/docs/helm-chart/stable/index.html) 甚至通过 statsd exporter 为 Prometheus 公开这些指标。

关于 spark 作业 - 是的 - 当前实施的 spark submit hook/operator 是在“主动轮询”模式下实施的。 airflow 的“worker”进程轮询作业的状态。但是 Airlfow 可以 运行 多个并行工作。此外,如果您愿意,您可以实现自己的任务,其行为会有所不同。

在“经典”Airflow 中,您需要实现一个 Submit Operator(提交作业)和“poke_reschedule”传感器(等待作业完成)并以这种方式实现您的 DAG该传感器任务将在操作员之后触发。 “Poke 重新安排”模式的工作方式是传感器仅在“轮询”时占用工作槽,然后释放该槽一段时间(直到再次检查)。

从 Airflow 2.2 开始,您还可以编写一个 Deferrable Operator (https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html?highlight=deferrable),您可以在其中编写单个 Operator - 首先进行提交,然后延迟状态检查 - 所有操作都在一个操作员中。可延迟运算符正在有效地处理(使用 async.io)潜在的数千个 waiting/deferred 运算符,而不会占用插槽或过多的资源。

更新:如果你真的不能使用 statsd(不需要 helm,statsd 就足够了),你不应该使用 DB 来获取有关 DAGS 的信息。使用稳定气流 REST API 代替:https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html