气流外部传感器卡在戳中
Airflow External sensor gets stuck at poking
我想要一个 dag 在另一个 dag 完成后开始。一种解决方案是使用外部传感器功能,您可以在下面找到我的解决方案。我遇到的问题是依赖的 dag 卡住了,我检查了这个 并确保两个 dags 都按相同的时间表运行,我的简化代码如下:
任何帮助,将不胜感激。
领袖达格:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '* * * * *'
dag = DAG('leader_dag', default_args=default_args,catchup=False,
schedule_interval=schedule)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
亲属:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False,
schedule_interval=schedule)
wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task',
external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1.set_upstream(wait_for_task)
leader_dag 的日志:
依赖 dag 的日志:
使用 ExternalTaskSensor
时,您必须为两个 DAG 指定相同的开始日期。如果这对您的用例不起作用,那么您需要在 ExternalTaskSensor
.
中使用 execution_delta
或 execution_date_fn
首先,leader_dag
中的 task_id
被命名为 print_date
,但您设置 dependent_dag
的任务 wait_for_task
正在等待 [=12] =]的任务名为t1
。没有名为 t1
的任务。您在 py
文件中分配给它的内容不相关,也没有在 Airflow 数据库中使用,也没有被传感器横向使用。它应该正在等待任务名称 print_date
.
其次,您的日志没有在 leader_dag 运行 中显示 dependent_dag 正在等待的内容。
最后,我不能推荐你使用 Airflow 每分钟安排任务。当然不是两个依赖任务在一起。
考虑在不同的系统(如 Spark)中编写流作业,或者为此滚动你自己的 Celery 或 Dask 环境。
您还可以通过在 leader_dag 的末尾添加 TriggerDagRunOperator
来触发 dependent_dag 并从中删除计划,从而避免 ExternalTaskSensor
将 schedule_interval
设置为 None
。
我在您的日志中看到的是 2018-10-13T19:08:11 的领导者日志。这充其量是 execution_date 2018-10-13 19:07:00 的 dag运行 因为从 19:07 开始的分钟周期在 19:08 结束,这是最早的可以预定。我发现调度和执行之间有大约 11 秒的延迟如果是这种情况。然而,Airflow 中可能会有多分钟的调度延迟。
我还看到了 dependent_dag
的日志,其中 运行 从 19:14:04 到 19:14:34 并且正在寻找相应的 19:13:00 的完成达格运行。没有迹象表明您的调度程序没有足够的延迟,可以通过 19:14:34 启动 leader_dag
的 19:13:00 dag运行。如果你展示它戳了 5 分钟左右,你可能会更好地说服我。当然 永远不会感觉到 leader_dag.t1 因为这不是你给显示的任务命名的。
所以,Airflow 有调度延迟,如果你在系统中有几 1000 个 dag,它可能会超过 1 分钟,这样 catchup=False
你会得到一些 运行s 相互跟随 IE 19:08、19:09 和一些 运行s 会跳过一分钟(或 6 分钟),例如 19:10 后跟 19:16,并且由于延迟在 dag-by-dag 的基础上有点随机,即使您有正确的任务 ID 等待,您也可能会 运行s 与传感器不对齐:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='leader_dag',
- external_task_id='t1',
+ external_task_id='print_date',
dag=dag)
对此的简单解决方案是执行以下操作:
1- 确保您在 ExternalTaskSensor 中设置的所有变量都设置正确,就像您希望 master_dag 有一只眼睛的 dag 的 task_id 和 dag_id上。
2- 使 master_dag 和 slave_dag(您要等待的 dag)具有相同的 start_date,否则将无法工作。如果您的奴隶从下午 22 点开始,而主人从 22:30 开始,那么您应该在执行增量中指定 30 分钟的差异。
如果您的错误无法通过以下方式解决,那么您的问题要么是基本问题,要么是您对 dag 的编程方式太错误了..
我想要一个 dag 在另一个 dag 完成后开始。一种解决方案是使用外部传感器功能,您可以在下面找到我的解决方案。我遇到的问题是依赖的 dag 卡住了,我检查了这个
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2015, 6, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule = '* * * * *'
dag = DAG('leader_dag', default_args=default_args,catchup=False,
schedule_interval=schedule)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
亲属:
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
from airflow.operators.sensors import ExternalTaskSensor
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2018, 10, 8),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
schedule='* * * * *'
dag = DAG('dependent_dag', default_args=default_args, catchup=False,
schedule_interval=schedule)
wait_for_task = ExternalTaskSensor(task_id = 'wait_for_task',
external_dag_id = 'leader_dag', external_task_id='t1', dag=dag)
t1 = BashOperator(
task_id='print_date',
bash_command='date',
dag=dag)
t1.set_upstream(wait_for_task)
leader_dag 的日志:
依赖 dag 的日志:
使用 ExternalTaskSensor
时,您必须为两个 DAG 指定相同的开始日期。如果这对您的用例不起作用,那么您需要在 ExternalTaskSensor
.
execution_delta
或 execution_date_fn
首先,leader_dag
中的 task_id
被命名为 print_date
,但您设置 dependent_dag
的任务 wait_for_task
正在等待 [=12] =]的任务名为t1
。没有名为 t1
的任务。您在 py
文件中分配给它的内容不相关,也没有在 Airflow 数据库中使用,也没有被传感器横向使用。它应该正在等待任务名称 print_date
.
其次,您的日志没有在 leader_dag 运行 中显示 dependent_dag 正在等待的内容。
最后,我不能推荐你使用 Airflow 每分钟安排任务。当然不是两个依赖任务在一起。 考虑在不同的系统(如 Spark)中编写流作业,或者为此滚动你自己的 Celery 或 Dask 环境。
您还可以通过在 leader_dag 的末尾添加 TriggerDagRunOperator
来触发 dependent_dag 并从中删除计划,从而避免 ExternalTaskSensor
将 schedule_interval
设置为 None
。
我在您的日志中看到的是 2018-10-13T19:08:11 的领导者日志。这充其量是 execution_date 2018-10-13 19:07:00 的 dag运行 因为从 19:07 开始的分钟周期在 19:08 结束,这是最早的可以预定。我发现调度和执行之间有大约 11 秒的延迟如果是这种情况。然而,Airflow 中可能会有多分钟的调度延迟。
我还看到了 dependent_dag
的日志,其中 运行 从 19:14:04 到 19:14:34 并且正在寻找相应的 19:13:00 的完成达格运行。没有迹象表明您的调度程序没有足够的延迟,可以通过 19:14:34 启动 leader_dag
的 19:13:00 dag运行。如果你展示它戳了 5 分钟左右,你可能会更好地说服我。当然 永远不会感觉到 leader_dag.t1 因为这不是你给显示的任务命名的。
所以,Airflow 有调度延迟,如果你在系统中有几 1000 个 dag,它可能会超过 1 分钟,这样 catchup=False
你会得到一些 运行s 相互跟随 IE 19:08、19:09 和一些 运行s 会跳过一分钟(或 6 分钟),例如 19:10 后跟 19:16,并且由于延迟在 dag-by-dag 的基础上有点随机,即使您有正确的任务 ID 等待,您也可能会 运行s 与传感器不对齐:
wait_for_task = ExternalTaskSensor(
task_id='wait_for_task',
external_dag_id='leader_dag',
- external_task_id='t1',
+ external_task_id='print_date',
dag=dag)
对此的简单解决方案是执行以下操作:
1- 确保您在 ExternalTaskSensor 中设置的所有变量都设置正确,就像您希望 master_dag 有一只眼睛的 dag 的 task_id 和 dag_id上。
2- 使 master_dag 和 slave_dag(您要等待的 dag)具有相同的 start_date,否则将无法工作。如果您的奴隶从下午 22 点开始,而主人从 22:30 开始,那么您应该在执行增量中指定 30 分钟的差异。
如果您的错误无法通过以下方式解决,那么您的问题要么是基本问题,要么是您对 dag 的编程方式太错误了..