Airflow:DAG 的监控解决方案 运行

Airflow: Monitoring solutions for a DAG run

我目前正在尝试为 Airflow 设置监控,理想情况下会在执行 DAG 时发送一封电子邮件,在邮件中包含有关所有包含任务的一些信息,例如任务的最终状态, 运行时间等

我目前未能解决的问题is/are:

除此之外,我还有一个额外的问题,即我寻找的解决方案在某种意义上必须简单,它应该只有 2-3 行代码或可泛化为 Python作为我 Python 经验较少的同事,必须能够理解和重现其他 DAG 上的步骤。

非常欢迎关于如何建立电子邮件发送的更聪明的想法。

提前感谢大家的建议!

Does it make sense to have the mail-sending as a component within the DAG? If so, how could I then assure in a simple way, that the task will run after all other tasks?

我认为这是实现您想要的目标的一种方式。您可以创建一个任务,将所有“叶子”(没有下游依赖项的任务)连接到一个最终任务,该任务通过电子邮件发送 DAG 的状态(在这种情况下,dagrun 仍然是 运行),该任务具有另一个状态任务。

    def send_task_summary_t(**context):
        tis = context['dag_run'].get_task_instances()
        for ti in tis:
            print(ti.__dict__)

    dag = DAG(...)

    job_status = PythonOperator(
        task_id='_job_status',
        python_callable=send_task_summary,
        provide_context=True,
        trigger_rule=TriggerRule.ALL_DONE,
        dag=dag
    )

    leaves = [task for task in dag.tasks if not task.downstream_list]
    exclude = ['_job_status']
    for l in leaves:
        if l.task_id not in exclude:
            job_status.set_upstream(l)

How can I get the states of all task instances associated with a DAG run?

我建议使用 PythonOperator 而不是 EmailOperator,因为您将需要包含获取任务状态所需信息的上下文。基于上面的代码片段,我利用 send_email 实用程序发送电子邮件。

from airflow.utils.email import send_email

def send_task_summary_t(**context):
    ti = context['task']
    dr = context['dag_run']
    body = ti.render_template(None, "path/to/template", context)
    send_email(to="alan@example.com", subject=f"{dr} summary", html_content=body)

您还可以使用 Jinja 模板来构建您的电子邮件。

<html>
    <body>
       <div>
          <table>
                {% for ti in dag_run.get_task_instances(): -%}
                    <tr>
                        <td class='{{ti.state}}' >
                            <a href='{{ host_server }}/admin/airflow/log?execution_date={{ts}}&task_id={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.state}}</a></td>
                        <td class="{{ti.operator}}">
                            <a href='{{ host_server }}/admin/airflow/graph?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.task_id}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/tree?base_date={{ts}}&num_runs=50&root={{ti.task_id}}&dag_id={{dag.dag_id}}'>{{ti.start_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/gantt?root={{ti.task_id}}&dag_id={{dag.dag_id}}&execution_date={{ts}}'>{{ti.end_date}}</a></td>
                        <td><a href='{{ host_server }}/admin/airflow/duration?root={{ti.task_id}}&base_date={{ts}}&days=9999&dag_id={{dag.dag_id}}'>{{ti.duration}}</a></td>
                    </tr>
                {% endfor -%}
            </table>
        </div>
    </body>
</html>

解决此问题的另一种方法是对 DAG 对象使用 on_failure_callback

from airflow.models import DAG
from datetime import datetime

def send_task_summary(context):
    tis = context['dag_run'].get_task_instances()
    for ti in tis:
        print(ti.__dict__)

dag = DAG(
        dag_id='my_dag',
        schedule_interval='@once',
        start_date=datetime(2020, 1, 1),
        on_failure_callback=send_task_summary
)