气流失败松弛消息

Airflow failed slack message

我如何配置 Airflow 以便 DAG 中的任何故障都会(立即)导致松弛消息?

此时我通过创建一个 slack_failed_task:

来管理它
slack_failed_task =  SlackAPIPostOperator(
    task_id='slack_failed',
    channel="#datalabs",
    trigger_rule='one_failed',
    token="...",
    text = ':red_circle: DAG Failed',
    icon_url = 'http://airbnb.io/img/projects/airflow3.png',
    dag=dag)

并将此任务 (one_failed) 设置为 DAG 中每个其他任务的上游:

slack_failed_task << download_task_a
slack_failed_task << download_task_b
slack_failed_task << process_task_c
slack_failed_task << process_task_d
slack_failed_task << other_task_e

它可以工作,但很容易出错,因为忘记添加任务会跳过松弛通知,而且看起来工作量很大。

有没有办法扩展 DAG 中的 email_on_failure 属性?

奖励 ;-) 包含一种将失败任务的名称传递给消息的方法。

BaseOperator 支持'on_failure_callback'参数:

on_failure_callback (callable) – a function to be called when a task instance of this task fails. a context dictionary is passed as a single parameter to this function. Context contains references to related objects to the task instance and is documented under the macros section of the API.

我没有对此进行测试,但您应该能够定义一个函数,该函数在失败时发布到 slack 并将其传递给每个任务定义。要获取当前任务的名称,可以使用 {{ task_id }} 模板。

也许这个例子会有所帮助:

def slack_failed_task(contextDictionary, **kwargs):  
       failed_alert = SlackAPIPostOperator(
         task_id='slack_failed',
         channel="#datalabs",
         token="...",
         text = ':red_circle: DAG Failed',
         owner = '_owner',)
         return failed_alert.execute


task_with_failed_slack_alerts = PythonOperator(
task_id='task0',
python_callable=<file to execute>,
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)

尝试新的 SlackWebhookOperator,它存在于 Airflow 版本>=1.10.0

from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator

slack_msg="Hi Wssup?"

slack_test =  SlackWebhookOperator(
    task_id='slack_test',
    http_conn_id='slack_connection',
    webhook_token='/1234/abcd',
    message=slack_msg,
    channel='#airflow_updates',
    username='airflow_'+os.environ['ENVIRONMENT'],
    icon_emoji=None,
    link_names=False,
    dag=dag)

注意:确保您在 Airflow 连接中添加了 slack_connection 作为

host=https://hooks.slack.com/services/

我希望将回调添加到 DAG 并由其所有任务继承:

def on_failure_callback(context):
    webhook_url = os.getenv('SLACK_WEBHOOK_TOKEN')
    slack_data = {
        'text': "@here DAG {} Failed".format(context['dag'].dag_id)
    }

    response = requests.post(
        webhook_url, data=json.dumps(slack_data),
        headers={'Content-Type': 'application/json'}
    )

dag = DAG(
    dag_id='dag_with_templated_dir',
    start_date=datetime(2020, 1, 1),
    on_failure_callback=on_failure_callback
)

How can I configure Airflow so that any failure in the DAG will (immediately) result in a slack message?

使用 airflow.providers.slack.hooks.slack_webhook.SlackWebhookHook 你可以通过在 DAG 级别传递 on_failure_callback 函数来实现。

Bonus ;-) for including a way to pass the name of the failed task to the message.


def fail():
    raise Exception("Task failed intentionally for testing purpose")

def success():
    print("success")

def task_fail_slack_alert(context):
    tis_dagrun = context['ti'].get_dagrun().get_task_instances()
    failed_tasks = []
    for ti in tis_dagrun:
        if ti.state == State.FAILED:
            # Adding log url
            failed_tasks.append(f"<{ti.log_url}|{ti.task_id}>")
    
    dag=context.get('task_instance').dag_id
    exec_date=context.get('execution_date')

    blocks = [
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": ":red_circle: Dag Failed."
            }
        },
        {
            "type": "section",
            "block_id": f"section{uuid.uuid4()}",
            "text": {
                "type": "mrkdwn",
                "text": f"*Dag*: {dag} \n *Execution Time*: {exec_date}"
            },
            "accessory": {
                "type": "image",
                "image_url": "https://raw.githubusercontent.com/apache/airflow/main/airflow/www/static/pin_100.png",
                "alt_text": "Airflow"
            }
        },
        {
            "type": "section",
            "text": {
                "type": "mrkdwn",
                "text": f"Failed Tasks: {', '.join(failed_tasks)}"
            }
        }
    ]
    failed_alert = SlackWebhookHook(
        http_conn_id='slack-airflow',
        channel="#airflow-notifications",    
        blocks=blocks,
        username='airflow'
    )
    failed_alert.execute()
    return 

default_args = {
    'owner': 'airflow'
}
with DAG(
    dag_id="slack-test",
    default_args=default_args,
    start_date=datetime(2021,8,19),
    schedule_interval=None,
    on_failure_callback=task_fail_slack_alert
) as dag:

    task_1 = PythonOperator(
        task_id="slack_notification_test",
        python_callable=fail
    )

    task_2 = PythonOperator(
        task_id="slack_notification_test2",
        python_callable=success
    )