如何将气流 url link 检索到 DAG 中的最新任务日志?
How do I retrieve airflow url link to my latest task log in a DAG?
在单个 DAG 任务中,如何在 python 运算符的帮助下设置 url link,因为我打算发送 url link 将最新的日志在出错时直接发送给用户,使用户可以直接访问页面,跳过导航步骤。
您可以定义一个 callback
函数,然后您可以将其作为默认参数传递给 DAG()
运算符。
def on_failure_callback_slack(context):
message = f"Failue! airflow task: {context['task_instance'].task_id} failed" \
f"dag: {base_url}?dag_id={context['dag'].dag_id} " \
f"{str(context['task_instance'])}"
operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
return operator.execute(context=context)
在上面的代码中,post_to_slack()
只是post的一个效用函数,可以用requests.post(...)
松弛
您可以将此函数传递给 DAG
,它将 post 与 url 一起松弛(或您选择的其他媒介)。请注意,您必须提供 base_url
才能使 url 正常工作。
default_args = {"on_failure_callback": on_failure_callback_slack}
dag=DAG(dag_id='some_id', default_args=default_args)
有关更多信息,您可以在此处阅读:https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html
在单个 DAG 任务中,如何在 python 运算符的帮助下设置 url link,因为我打算发送 url link 将最新的日志在出错时直接发送给用户,使用户可以直接访问页面,跳过导航步骤。
您可以定义一个 callback
函数,然后您可以将其作为默认参数传递给 DAG()
运算符。
def on_failure_callback_slack(context):
message = f"Failue! airflow task: {context['task_instance'].task_id} failed" \
f"dag: {base_url}?dag_id={context['dag'].dag_id} " \
f"{str(context['task_instance'])}"
operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
return operator.execute(context=context)
在上面的代码中,post_to_slack()
只是post的一个效用函数,可以用requests.post(...)
您可以将此函数传递给 DAG
,它将 post 与 url 一起松弛(或您选择的其他媒介)。请注意,您必须提供 base_url
才能使 url 正常工作。
default_args = {"on_failure_callback": on_failure_callback_slack}
dag=DAG(dag_id='some_id', default_args=default_args)
有关更多信息,您可以在此处阅读:https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html