如何将 Airflow 中的 dbt 日志发送给相关所有者?

How do I dispatch dbt logs from Airflow to the relevant owner?

我正在 运行宁 dbt (v0.19.0) on Apache Airflow (v2.0.2) using KuberneterPodOperator 并在失败时向 Slack 发送警报。嵌套在 dbt 中的是来自多个所有者的多个模型,它们之间存在交叉依赖关系,它们都是 运行 以及 run 命令。

例如:

KubernetesPodOperator(
  dbt_command="run",
  task_id="run_models",
  on_failure_callback=send_slack_alert,
)

我正在努力确保每个模型所有者都能在他们的相关频道中收到属于他们的警报。

为了更好地解释我的问题,假设 dbt 中有两个模型; 型号-A & 型号-BModel-A 归 A 队所有,Model-B 归 B 队所有。使用这种方法(因为有一个 dbt run 命令)如果 Model-A 失败,该失败将出现在 Model 的共享日志中-AModel-B。我们还假设 A 团队和 B 团队都有自己的警报渠道。但是,因为 dbt 运行 使用单个命令,所以所有警报都发送到公共通道。

现在想象一下有很多模型(模型-A、模型-B.....模型-Z)。我如何改进现有流程以确保 Model-A 中的失败被发送到 A 团队警报通道,Model-B[=35= 中的失败] 被发送到 B-team 警报频道...等等。

如何将来自 dbt(运行ning 在 Airflow 中)的错误发送给相关所有者以使警报可操作?

我建议您最终可能会得到 m 个团队拥有的 n 个模型。

您最简单的更改是标记 每个 dbt 模型都有一个所属团队。然后调用该模型回拨给该团队,例如

KubernetesPodOperator(
  dbt_command="run -m tags:team1",
  task_id="run_models",
  on_failure_callback=send_slack_alert_team1,
)

您可以考虑将参数传递给您的警报而不是自定义回调 (Pass other arguments to on_failure_callback)。

只要您想 运行 所有者组中的模型,这就会起作用,但如果存在内部所有者依赖性,则可能会出现问题。

您可以分解您的 Airflow 模型以从您的模型中组成一个动态 dag,运行一次创建一个模型,例如这里 https://www.astronomer.io/blog/airflow-dbt-1.

然后您可以在同一个动态循环中分配松弛运算符。