Airflow 自定义指标 and/or 带有自定义字段的结果对象
Airflow Custom Metrics and/or Result Object with custom fields
虽然 运行 pySpark SQL 通过 Airflow 进行管道传输,但我有兴趣获取一些业务统计信息,例如:
- 源读取计数
- 目标写入计数
- 处理过程中 DF 的大小
- 错误记录数
一个想法是将其直接推送到指标,这样它将自动被 Prometheus 等监控工具使用。另一个想法是通过一些 DAG 结果对象获取这些值,但我无法在文档中找到任何相关信息。
如果您有解决方案,请post至少提供一些伪代码。
我希望在 airflow.stats.Stats
class 中重用 Airflow 的统计数据和监控支持。也许是这样的:
import logging
from airflow.stats import Stats
PYSPARK_LOG_PREFIX = "airflow_pyspark"
def your_python_operator(**context):
[...]
try:
Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
# So on and so forth
except:
logging.exception("Caught exception during statistics logging")
[...]
虽然 运行 pySpark SQL 通过 Airflow 进行管道传输,但我有兴趣获取一些业务统计信息,例如:
- 源读取计数
- 目标写入计数
- 处理过程中 DF 的大小
- 错误记录数
一个想法是将其直接推送到指标,这样它将自动被 Prometheus 等监控工具使用。另一个想法是通过一些 DAG 结果对象获取这些值,但我无法在文档中找到任何相关信息。
如果您有解决方案,请post至少提供一些伪代码。
我希望在 airflow.stats.Stats
class 中重用 Airflow 的统计数据和监控支持。也许是这样的:
import logging
from airflow.stats import Stats
PYSPARK_LOG_PREFIX = "airflow_pyspark"
def your_python_operator(**context):
[...]
try:
Stats.incr(f"{PYSPARK_LOG_PREFIX}_read_count", src_read_count)
Stats.incr(f"{PYSPARK_LOG_PREFIX}_write_count", tgt_write_count)
# So on and so forth
except:
logging.exception("Caught exception during statistics logging")
[...]