如何将气流 url link 检索到 DAG 中的最新任务日志?

How do I retrieve airflow url link to my latest task log in a DAG?

在单个 DAG 任务中,如何在 python 运算符的帮助下设置 url link,因为我打算发送 url link 将最新的日志在出错时直接发送给用户,使用户可以直接访问页面,跳过导航步骤。

您可以定义一个 callback 函数,然后您可以将其作为默认参数传递给 DAG() 运算符。

def on_failure_callback_slack(context):
    message = f"Failue! airflow task: {context['task_instance'].task_id} failed" \
f"dag: {base_url}?dag_id={context['dag'].dag_id} " \
f"{str(context['task_instance'])}"

    operator = PythonOperator(task_id="failure", python_callable=post_to_slack, op_kwargs={'message': message}
    
    return operator.execute(context=context)

在上面的代码中,post_to_slack()只是post的一个效用函数,可以用requests.post(...)

松弛

您可以将此函数传递给 DAG,它将 post 与 url 一起松弛(或您选择的其他媒介)。请注意,您必须提供 base_url 才能使 url 正常工作。

default_args = {"on_failure_callback": on_failure_callback_slack}
dag=DAG(dag_id='some_id', default_args=default_args)

有关更多信息,您可以在此处阅读:https://airflow.apache.org/docs/apache-airflow/2.2.1/logging-monitoring/callbacks.html