气流外部传感器卡在戳中

Airflow External sensor gets stuck at poking

我想要一个 dag 在另一个 dag 完成后开始。一种解决方案是使用外部传感器功能,您可以在下面找到我的解决方案。我遇到的问题是依赖的 dag 卡住了,我检查了这个 并确保两个 dags 都按相同的时间表运行,我的简化代码如下: 任何帮助,将不胜感激。 领袖达格:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
   'owner': 'airflow',
   'depends_on_past': False,
   'start_date': datetime(2015, 6, 1),
   'retries': 1,
   'retry_delay': timedelta(minutes=5),



 }

 schedule = '* * * * *'

 dag = DAG('leader_dag', default_args=default_args,catchup=False, 
 schedule_interval=schedule)

t1 = BashOperator(
   task_id='print_date',
   bash_command='date',
   dag=dag)

亲属:

from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 10, 8),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),


}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False, 
schedule_interval=schedule)

 wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task', 
 external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)

 t1 = BashOperator(
     task_id='print_date',
     bash_command='date',
      dag=dag)

 t1.set_upstream(wait_for_task)

leader_dag 的日志:

依赖 dag 的日志:

使用 ExternalTaskSensor 时,您必须为两个 DAG 指定相同的开始日期。如果这对您的用例不起作用,那么您需要在 ExternalTaskSensor.

中使用 execution_deltaexecution_date_fn

首先,leader_dag 中的 task_id 被命名为 print_date,但您设置 dependent_dag 的任务 wait_for_task 正在等待 [=12] =]的任务名为t1。没有名为 t1 的任务。您在 py 文件中分配给它的内容不相关,也没有在 Airflow 数据库中使用,也没有被传感器横向使用。它应该正在等待任务名称 print_date.

其次,您的日志没有在 leader_dag 运行 中显示 dependent_dag 正在等待的内容。

最后,我不能推荐你使用 Airflow 每分钟安排任务。当然不是两个依赖任务在一起。 考虑在不同的系统(如 Spark)中编写流作业,或者为此滚动你自己的 Celery 或 Dask 环境。

您还可以通过在 leader_dag 的末尾添加 TriggerDagRunOperator 来触发 dependent_dag 并从中删除计划,从而避免 ExternalTaskSensorschedule_interval 设置为 None

我在您的日志中看到的是 2018-10-13T19:08:11 的领导者日志。这充其量是 execution_date 2018-10-13 19:07:00 的 dag运行 因为从 19:07 开始的分钟周期在 19:08 结束,这是最早的可以预定。我发现调度和执行之间有大约 11 秒的延迟如果是这种情况。然而,Airflow 中可能会有多分钟的调度延迟。

我还看到了 dependent_dag 的日志,其中 运行 从 19:14:04 到 19:14:34 并且正在寻找相应的 19:13:00 的完成达格运行。没有迹象表明您的调度程序没有足够的延迟,可以通过 19:14:34 启动 leader_dag 的 19:13:00 dag运行。如果你展示它戳了 5 分钟左右,你可能会更好地说服我。当然 永远不会感觉到 leader_dag.t1 因为这不是你给显示的任务命名的。

所以,Airflow 有调度延迟,如果你在系统中有几 1000 个 dag,它可能会超过 1 分钟,这样 catchup=False 你会得到一些 运行s 相互跟随 IE 19:08、19:09 和一些 运行s 会跳过一分钟(或 6 分钟),例如 19:10 后跟 19:16,并且由于延迟在 dag-by-dag 的基础上有点随机,即使您有正确的任务 ID 等待,您也可能会 运行s 与传感器不对齐:

 wait_for_task = ExternalTaskSensor(
     task_id='wait_for_task', 
     external_dag_id='leader_dag',
-    external_task_id='t1',
+    external_task_id='print_date',
     dag=dag)

对此的简单解决方案是执行以下操作:

1- 确保您在 ExternalTask​​Sensor 中设置的所有变量都设置正确,就像您希望 master_dag 有一只眼睛的 dag 的 task_id 和 dag_id上。

2- 使 master_dag 和 slave_dag(您要等待的 dag)具有相同的 start_date,否则将无法工作。如果您的奴隶从下午 22 点开始,而主人从 22:30 开始,那么您应该在执行增量中指定 30 分钟的差异。

如果您的错误无法通过以下方式解决,那么您的问题要么是基本问题,要么是您对 dag 的编程方式太错误了..