气流健康检查
airflow health check
我用的airflow,有时候pipeline等的时间比较长。也有工作 运行 时间过长的情况(大概占用了其他工作的资源)
我正在尝试研究如何以编程方式识别调度程序的健康状况,并在未来无需任何额外框架的情况下监控这些调度程序。我开始查看元数据数据库表。我现在能想到的就是从任务的dag_run
和duration
中看到start_date
和end_date
。我应该关注的其他指标是什么?非常感谢您的帮助。
无需“深入”数据库。
Airflow 为您提供可用于特定目的的指标:https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html
如果向下滚动,您会看到所有有用的指标,其中一些正是您要查找的指标(尤其是计时器)。
这可以通过通常的指标集成来完成。 Airflow 通过 statsd 发布指标,Airflow Official Helm Chart (https://airflow.apache.org/docs/helm-chart/stable/index.html) 甚至通过 statsd exporter 为 Prometheus 公开这些指标。
关于 spark 作业 - 是的 - 当前实施的 spark submit hook/operator 是在“主动轮询”模式下实施的。 airflow 的“worker”进程轮询作业的状态。但是 Airlfow 可以 运行 多个并行工作。此外,如果您愿意,您可以实现自己的任务,其行为会有所不同。
在“经典”Airflow 中,您需要实现一个 Submit Operator(提交作业)和“poke_reschedule”传感器(等待作业完成)并以这种方式实现您的 DAG该传感器任务将在操作员之后触发。 “Poke 重新安排”模式的工作方式是传感器仅在“轮询”时占用工作槽,然后释放该槽一段时间(直到再次检查)。
从 Airflow 2.2 开始,您还可以编写一个 Deferrable Operator (https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html?highlight=deferrable),您可以在其中编写单个 Operator - 首先进行提交,然后延迟状态检查 - 所有操作都在一个操作员中。可延迟运算符正在有效地处理(使用 async.io)潜在的数千个 waiting/deferred 运算符,而不会占用插槽或过多的资源。
更新:如果你真的不能使用 statsd(不需要 helm,statsd 就足够了),你不应该使用 DB 来获取有关 DAGS 的信息。使用稳定气流 REST API 代替:https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html
我用的airflow,有时候pipeline等的时间比较长。也有工作 运行 时间过长的情况(大概占用了其他工作的资源)
我正在尝试研究如何以编程方式识别调度程序的健康状况,并在未来无需任何额外框架的情况下监控这些调度程序。我开始查看元数据数据库表。我现在能想到的就是从任务的dag_run
和duration
中看到start_date
和end_date
。我应该关注的其他指标是什么?非常感谢您的帮助。
无需“深入”数据库。
Airflow 为您提供可用于特定目的的指标:https://airflow.apache.org/docs/apache-airflow/stable/logging-monitoring/metrics.html
如果向下滚动,您会看到所有有用的指标,其中一些正是您要查找的指标(尤其是计时器)。
这可以通过通常的指标集成来完成。 Airflow 通过 statsd 发布指标,Airflow Official Helm Chart (https://airflow.apache.org/docs/helm-chart/stable/index.html) 甚至通过 statsd exporter 为 Prometheus 公开这些指标。
关于 spark 作业 - 是的 - 当前实施的 spark submit hook/operator 是在“主动轮询”模式下实施的。 airflow 的“worker”进程轮询作业的状态。但是 Airlfow 可以 运行 多个并行工作。此外,如果您愿意,您可以实现自己的任务,其行为会有所不同。
在“经典”Airflow 中,您需要实现一个 Submit Operator(提交作业)和“poke_reschedule”传感器(等待作业完成)并以这种方式实现您的 DAG该传感器任务将在操作员之后触发。 “Poke 重新安排”模式的工作方式是传感器仅在“轮询”时占用工作槽,然后释放该槽一段时间(直到再次检查)。
从 Airflow 2.2 开始,您还可以编写一个 Deferrable Operator (https://airflow.apache.org/docs/apache-airflow/stable/concepts/deferring.html?highlight=deferrable),您可以在其中编写单个 Operator - 首先进行提交,然后延迟状态检查 - 所有操作都在一个操作员中。可延迟运算符正在有效地处理(使用 async.io)潜在的数千个 waiting/deferred 运算符,而不会占用插槽或过多的资源。
更新:如果你真的不能使用 statsd(不需要 helm,statsd 就足够了),你不应该使用 DB 来获取有关 DAGS 的信息。使用稳定气流 REST API 代替:https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html