气流失败松弛消息
Airflow failed slack message
我如何配置 Airflow 以便 DAG 中的任何故障都会(立即)导致松弛消息?
此时我通过创建一个 slack_failed_task:
来管理它
slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
并将此任务 (one_failed) 设置为 DAG 中每个其他任务的上游:
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,而且看起来工作量很大。
有没有办法扩展 DAG 中的 email_on_failure
属性?
奖励 ;-) 包含一种将失败任务的名称传递给消息的方法。
BaseOperator 支持'on_failure_callback'参数:
on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
我没有对此进行测试,但您应该能够定义一个函数,该函数在失败时发布到 slack 并将其传递给每个任务定义。要获取当前任务的名称,可以使用 {{ task_id }} 模板。
也许这个例子会有所帮助:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg="Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
注意:确保您在 Airflow 连接中添加了 slack_connection
作为
host=https://hooks.slack.com/services/
我希望将回调添加到 DAG 并由其所有任务继承:
def on_failure_callback(context):
webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
slack_data = {
'text': "@here DAG {} Failed".format(context['dag'].dag_id)
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
dag = DAG(
dag_id='dag_with_templated_dir',
start_date=datetime(2020, 1, 1),
on_failure_callback=on_failure_callback
)
How can I configure Airflow so that any failure in the DAG will
(immediately) result in a slack message?
使用 airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
你可以通过在 DAG 级别传递 on_failure_callback
函数来实现。
Bonus ;-) for including a way to pass the name of the failed task to
the message.
def fail():
raise Exception("Task failed intentionally for testing purpose")
def success():
print("success")
def task_fail_slack_alert(context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_tasks = []
for ti in tis_dagrun:
if ti.state == State.FAILED:
# Adding log url
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":red_circle: Dag Failed."
}
},
{
"type": "section",
"block_id": f"section{uuid.uuid4()}",
"text": {
"type": "mrkdwn",
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
},
"accessory": {
"type": "image",
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
"alt_text": "Airflow"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Failed Tasks: {', '.join(failed_tasks)}"
}
}
]
failed_alert = SlackWebhookHook(
http_conn_id='slack-airflow',
channel="#airflow-notifications",
blocks=blocks,
username='airflow'
)
failed_alert.execute()
return
default_args = {
'owner': 'airflow'
}
with DAG(
dag_id="slack-test",
default_args=default_args,
start_date=datetime(2021,8,19),
schedule_interval=None,
on_failure_callback=task_fail_slack_alert
) as dag:
task_1 = PythonOperator(
task_id="slack_notification_test",
python_callable=fail
)
task_2 = PythonOperator(
task_id="slack_notification_test2",
python_callable=success
)
我如何配置 Airflow 以便 DAG 中的任何故障都会(立即)导致松弛消息?
此时我通过创建一个 slack_failed_task:
来管理它slack_failed_task = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
trigger_rule='one_failed',
token="...",
text = ':red_circle: DAG Failed',
icon_url = 'http://airbnb.io/img/projects/airflow3.png',
dag=dag)
并将此任务 (one_failed) 设置为 DAG 中每个其他任务的上游:
slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e
它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,而且看起来工作量很大。
有没有办法扩展 DAG 中的 email_on_failure
属性?
奖励 ;-) 包含一种将失败任务的名称传递给消息的方法。
BaseOperator 支持'on_failure_callback'参数:
on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.
我没有对此进行测试,但您应该能够定义一个函数,该函数在失败时发布到 slack 并将其传递给每个任务定义。要获取当前任务的名称,可以使用 {{ task_id }} 模板。
也许这个例子会有所帮助:
def slack_failed_task(contextDictionary, **kwargs):
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: DAG Failed',
owner = '_owner',)
return failed_alert.execute
task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)
尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
slack_msg="Hi Wssup?"
slack_test = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack_connection',
webhook_token='/1234/abcd',
message=slack_msg,
channel='#airflow_updates',
username='airflow_'+os.environ['ENVIRONMENT'],
icon_emoji=None,
link_names=False,
dag=dag)
注意:确保您在 Airflow 连接中添加了 slack_connection
作为
host=https://hooks.slack.com/services/
我希望将回调添加到 DAG 并由其所有任务继承:
def on_failure_callback(context):
webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
slack_data = {
'text': "@here DAG {} Failed".format(context['dag'].dag_id)
}
response = requests.post(
webhook_url, data=json.dumps(slack_data),
headers={'Content-Type': 'application/json'}
)
dag = DAG(
dag_id='dag_with_templated_dir',
start_date=datetime(2020, 1, 1),
on_failure_callback=on_failure_callback
)
How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?
使用 airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook
你可以通过在 DAG 级别传递 on_failure_callback
函数来实现。
Bonus ;-) for including a way to pass the name of the failed task to the message.
def fail():
raise Exception("Task failed intentionally for testing purpose")
def success():
print("success")
def task_fail_slack_alert(context):
tis_dagrun = context['ti'].get_dagrun().get_task_instances()
failed_tasks = []
for ti in tis_dagrun:
if ti.state == State.FAILED:
# Adding log url
failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
dag=context.get('task_instance').dag_id
exec_date=context.get('execution_date')
blocks = [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": ":red_circle: Dag Failed."
}
},
{
"type": "section",
"block_id": f"section{uuid.uuid4()}",
"text": {
"type": "mrkdwn",
"text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
},
"accessory": {
"type": "image",
"image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
"alt_text": "Airflow"
}
},
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": f"Failed Tasks: {', '.join(failed_tasks)}"
}
}
]
failed_alert = SlackWebhookHook(
http_conn_id='slack-airflow',
channel="#airflow-notifications",
blocks=blocks,
username='airflow'
)
failed_alert.execute()
return
default_args = {
'owner': 'airflow'
}
with DAG(
dag_id="slack-test",
default_args=default_args,
start_date=datetime(2021,8,19),
schedule_interval=None,
on_failure_callback=task_fail_slack_alert
) as dag:
task_1 = PythonOperator(
task_id="slack_notification_test",
python_callable=fail
)
task_2 = PythonOperator(
task_id="slack_notification_test2",
python_callable=success
)