为什么 Airflow DAG end_date 和持续时间未始终记录在松弛 on_success_callback 警报中?
Why is the Airflow DAG end_date and duration not being consistently recorded in a slack on_success_callback alert?
我正在尝试创建一个 Slack 警报,当 Airflow DAG 中的任务成功 运行 时,它 post 将一些基本信息发送到一个松弛通道中。我的代码如下:
import datetime
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = "slack_test"
def task_success_slack_alert(context):
"""
Callback task that can be used in DAG to alert of successful task completion
Args:
context (dict): Context variable passed in from Airflow
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task completed successfully
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Start Time*: {start}
*End Time*: {end}
*Duration*: {duration} seconds
*Try* {try_number} *of max retries* {max_tries}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("execution_date"),
log_url=context.get("task_instance").log_url,
start=context.get("task_instance").start_date,
end=context.get("task_instance").end_date,
duration=context.get("task_instance").duration,
try_number=context.get("task_instance")._try_number,
max_tries=context.get("task_instance").max_tries + 1
)
success_alert = SlackWebhookOperator(
task_id="slack_test",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username="airflow",
)
return success_alert.execute(context=context)
default_dag_args = {'owner': 'TEST',
'start_date': datetime.datetime(2021, 3, 29),
'retries': 1,
'use_legacy_sql': False,
'on_success_callback': task_success_slack_alert
}
with models.DAG('TESTING',
schedule_interval='0 9 * * *',
default_args=default_dag_args) as dag:
然后我有一系列将数据写入大查询表的 BigQueryOperator 任务。在每个任务为 运行 后输出松弛消息,但有时 'End Time' 和 'Duration' 为 'None',但是当我检查 DAG 的日志时,我可以看到结束时间被正确记录。有时 Duration/End 时间有效,有时无效,我看不出这是为什么的逻辑模式。我还注意到,有时它会针对同一任务 post 两次松弛警报,一次是 Duration/End 时间为 'None',然后再次填充它们。有谁知道为什么会这样?
有两点可以调用on_success_callback
- 当任务执行成功后。 (taskinstance.py)
- 当元数据数据库中的任务状态未反映进程当前正在执行的操作时 (local_task_job.py)
在某些情况下,由于任务实例执行和本地任务作业之间的竞争条件,回调会快速连续执行两次。此 Github Issue 描述了与您类似的情况。
由于不知道发送 slack 消息的时间,我不能确定这是同一个问题。如果我不得不猜测,缺少结束时间和持续时间的松弛消息来自本地任务作业,填充了元数据的松弛消息来自任务实例。
如果您在回调中添加一些调用者检查代码,您可以找出它的来源。请在 getting caller information.
上参考此 Whosebug post
您提到来自 local_task_job.py
的回调具有完整的元数据输出集。从代码的角度来看,这确实有意义。在 taskinstance.py
中调用 on_success_callback
之前 The end_date
is not set。另一方面,当本地任务作业检测到状态为 SUCCESS 时,它将 运行 on_success_callback
,此时 end_date
已设置。
这让我想起了,我以前也运行关注过这个问题,并以调用成功回调的当前时间作为结束时间。
我正在尝试创建一个 Slack 警报,当 Airflow DAG 中的任务成功 运行 时,它 post 将一些基本信息发送到一个松弛通道中。我的代码如下:
import datetime
from airflow import models
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = "slack_test"
def task_success_slack_alert(context):
"""
Callback task that can be used in DAG to alert of successful task completion
Args:
context (dict): Context variable passed in from Airflow
Returns:
None: Calls the SlackWebhookOperator execute method internally
"""
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:large_blue_circle: Task completed successfully
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Start Time*: {start}
*End Time*: {end}
*Duration*: {duration} seconds
*Try* {try_number} *of max retries* {max_tries}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("execution_date"),
log_url=context.get("task_instance").log_url,
start=context.get("task_instance").start_date,
end=context.get("task_instance").end_date,
duration=context.get("task_instance").duration,
try_number=context.get("task_instance")._try_number,
max_tries=context.get("task_instance").max_tries + 1
)
success_alert = SlackWebhookOperator(
task_id="slack_test",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username="airflow",
)
return success_alert.execute(context=context)
default_dag_args = {'owner': 'TEST',
'start_date': datetime.datetime(2021, 3, 29),
'retries': 1,
'use_legacy_sql': False,
'on_success_callback': task_success_slack_alert
}
with models.DAG('TESTING',
schedule_interval='0 9 * * *',
default_args=default_dag_args) as dag:
然后我有一系列将数据写入大查询表的 BigQueryOperator 任务。在每个任务为 运行 后输出松弛消息,但有时 'End Time' 和 'Duration' 为 'None',但是当我检查 DAG 的日志时,我可以看到结束时间被正确记录。有时 Duration/End 时间有效,有时无效,我看不出这是为什么的逻辑模式。我还注意到,有时它会针对同一任务 post 两次松弛警报,一次是 Duration/End 时间为 'None',然后再次填充它们。有谁知道为什么会这样?
有两点可以调用on_success_callback
- 当任务执行成功后。 (taskinstance.py)
- 当元数据数据库中的任务状态未反映进程当前正在执行的操作时 (local_task_job.py)
在某些情况下,由于任务实例执行和本地任务作业之间的竞争条件,回调会快速连续执行两次。此 Github Issue 描述了与您类似的情况。
由于不知道发送 slack 消息的时间,我不能确定这是同一个问题。如果我不得不猜测,缺少结束时间和持续时间的松弛消息来自本地任务作业,填充了元数据的松弛消息来自任务实例。
如果您在回调中添加一些调用者检查代码,您可以找出它的来源。请在 getting caller information.
上参考此 Whosebug post您提到来自 local_task_job.py
的回调具有完整的元数据输出集。从代码的角度来看,这确实有意义。在 taskinstance.py
中调用 on_success_callback
之前 The end_date
is not set。另一方面,当本地任务作业检测到状态为 SUCCESS 时,它将 运行 on_success_callback
,此时 end_date
已设置。
这让我想起了,我以前也运行关注过这个问题,并以调用成功回调的当前时间作为结束时间。