为什么 Airflow DAG end_date 和持续时间未始终记录在松弛 on_success_callback 警报中?

Why is the Airflow DAG end_date and duration not being consistently recorded in a slack on_success_callback alert?

我正在尝试创建一个 Slack 警报,当 Airflow DAG 中的任务成功 运行 时,它 post 将一些基本信息发送到一个松弛通道中。我的代码如下:

import datetime

from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

SLACK_CONN_ID = "slack_test"


def task_success_slack_alert(context):
    """
    Callback task that can be used in DAG to alert of successful task completion
    Args:
        context (dict): Context variable passed in from Airflow
    Returns:
        None: Calls the SlackWebhookOperator execute method internally
    """

    slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
    slack_msg = """
            :large_blue_circle: Task completed successfully
            *Task*: {task}  
            *Dag*: {dag} 
            *Execution Time*: {exec_date}  
            *Start Time*: {start}
            *End Time*: {end}
            *Duration*: {duration} seconds
            *Try* {try_number} *of max retries* {max_tries}
            *Log Url*: {log_url} 
            """.format(
        task=context.get("task_instance").task_id,
        dag=context.get("task_instance").dag_id,
        ti=context.get("task_instance"),
        exec_date=context.get("execution_date"),
        log_url=context.get("task_instance").log_url,
        start=context.get("task_instance").start_date,
        end=context.get("task_instance").end_date,
        duration=context.get("task_instance").duration,
        try_number=context.get("task_instance")._try_number,
        max_tries=context.get("task_instance").max_tries + 1
        )

    success_alert = SlackWebhookOperator(
        task_id="slack_test",
        http_conn_id=SLACK_CONN_ID,
        webhook_token=slack_webhook_token,
        message=slack_msg,
        username="airflow",
        )

    return success_alert.execute(context=context)

default_dag_args = {'owner': 'TEST',
                    'start_date': datetime.datetime(2021, 3, 29),
                    'retries': 1,
                    'use_legacy_sql': False,
                    'on_success_callback': task_success_slack_alert
                    }

with models.DAG('TESTING',
                schedule_interval='0 9 * * *',
                default_args=default_dag_args) as dag:

然后我有一系列将数据写入大查询表的 BigQueryOperator 任务。在每个任务为 运行 后输出松弛消息,但有时 'End Time' 和 'Duration' 为 'None',但是当我检查 DAG 的日志时,我可以看到结束时间被正确记录。有时 Duration/End 时间有效,有时无效,我看不出这是为什么的逻辑模式。我还注意到,有时它会针对同一任务 post 两次松弛警报,一次是 Duration/End 时间为 'None',然后再次填充它们。有谁知道为什么会这样?

有两点可以调用on_success_callback

  1. 当任务执行成功后。 (taskinstance.py)
  2. 当元数据数据库中的任务状态未反映进程当前正在执行的操作时 (local_task_job.py)

在某些情况下,由于任务实例执行和本地任务作业之间的竞争条件,回调会快速连续执行两次。此 Github Issue 描述了与您类似的情况。

由于不知道发送 slack 消息的时间,我不能确定这是同一个问题。如果我不得不猜测,缺少结束时间和持续时间的松弛消息来自本地任务作业,填充了元数据的松弛消息来自任务实例。

如果您在回调中添加一些调用者检查代码,您可以找出它的来源。请在 getting caller information.

上参考此 Whosebug post

您提到来自 local_task_job.py 的回调具有完整的元数据输出集。从代码的角度来看,这确实有意义。在 taskinstance.py 中调用 on_success_callback 之前 The end_date is not set。另一方面,当本地任务作业检测到状态为 SUCCESS 时,它将 运行 on_success_callback,此时 end_date 已设置。

这让我想起了,我以前也运行关注过这个问题,并以调用成功回调的当前时间作为结束时间。