如何在 Apache Airflow Dag 中添加手动任务

How to add manual tasks in an Apache Airflow Dag

我正在使用 Apache Airflow 来管理数据处理管道。在流水线的中间,一些数据需要在下一步处理之前进行审核。例如 ... -> task1 -> human review -> task2 -> ... 其中 task1 和 task2 是数据处理任务。当 task1 完成后,task1 生成的数据需要人工审核。 reviewer审核通过数据后,task2就可以上线了。 人工审核任务可能需要很长时间(例如几周)。

我正在考虑使用外部数据库来存储人工审核结果。并且用一个Sensor按时间间隔戳一下评论结果。但是会占用一个Airflow worker直到review完成。

有什么想法吗?

我觉得你的想法不错。您可以创建一个专用的 DAG 来使用传感器检查审批流程的进度。如果您在传感器上使用低超时并在此 DAG 上使用适当的时间表,比如每 6 小时一次。根据这些任务的批准频率以及您需要多长时间执行下游任务对其进行调整。

一位同事建议有一个总是失败的任务,因此手动步骤只是将其标记为成功。我是这样实现的:

def always_fail():
    raise AirflowException('Please change this step to success to continue')


manual_sign_off = PythonOperator(
    task_id='manual_sign_off',
    dag=dag,
    python_callable=always_fail
)

start >> manual_sign_off >> end

在1.10之前,我是使用算子的重试功能来实现ManualSignOffTask的。运营商已设置重试和 retry_delay。所以任务失败后会重新调度。安排任务时,它将检查数据库以查看签核是否完成: 如果尚未完成签核,则任务失败并释放工作人员并等待下一个计划。 如果签核已经完成,则任务成功,并且 dag 运行 继续。

在 1.10 之后,引入了新的 TI 状态 UP_FOR_RESCHEDULE,并且 Sensor 原生支持长 运行ning 任务。

and 的小猪打包,这是一个完整的工作示例,它给用户两周的时间来查看第一个任务的结果,然后再永久失败:

from datetime import timedelta

from airflow.models import DAG
from airflow import AirflowException
from airflow.operators.python_operator import PythonOperator

from my_tasks import first_task_callable, second_task_callable


TIMEOUT = timedelta(days=14)


def task_to_fail():
    raise AirflowException("Please change this step to success to continue")


dag = DAG(dag_id="my_dag")

first_task = PythonOperator(
    dag=dag,
    task_id="first_task",
    python_callable=first_task_callable
)

manual_sign_off = PythonOperator(
    dag=dag,
    task_id="manual_sign_off",
    python_callable=task_to_fail,
    retries=1,
    max_retry_delay=TIMEOUT
)

second_task = PythonOperator(
    dag=dag,
    task_id="second_task",
    python_callable=second_task_callable
)

first_task >> manual_sign_off >> second_task