Airflow:Slack Alert:Post 错误信息
Airflow:Slack Alert:Post the error message
需求:如何将错误信息获取到slack消息中
气流版本:2.2.4
下面这条 slack 消息只说它失败了,但没有说失败的原因
def slack_alert(context):
SLACK_CONN_ID = 'slack_conn_id'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("logical_date"),
log_url=context.get("task_instance").log_url.replace(
"http://localhost:8080", "https://airflow.yourdomain.com"
),
)
slack_notification =SlackWebhookOperator(
task_id="slack_notification",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return slack_notification.execute(context=context)
找出答案,它是 context.get("errror")
或 context.get("reason")
需求:如何将错误信息获取到slack消息中 气流版本:2.2.4
下面这条 slack 消息只说它失败了,但没有说失败的原因
def slack_alert(context):
SLACK_CONN_ID = 'slack_conn_id'
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get("task_instance").task_id,
dag=context.get("task_instance").dag_id,
ti=context.get("task_instance"),
exec_date=context.get("logical_date"),
log_url=context.get("task_instance").log_url.replace(
"http://localhost:8080", "https://airflow.yourdomain.com"
),
)
slack_notification =SlackWebhookOperator(
task_id="slack_notification",
http_conn_id=SLACK_CONN_ID,
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return slack_notification.execute(context=context)
找出答案,它是 context.get("errror")
或 context.get("reason")