Airflow external_task_sensor 永不停歇
Airflow external_task_sensor never stops poking
我需要等待另一个 dag 中的另一个任务,直到我可以触发我自己的任务。但是我的外部传感器并没有停止戳动。我已经在这里阅读了一些其他相关问题,并确保我已经调整了 execution_delta。但是,我仍然有同样的问题。
这是我的两只狗
Parent 达格:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_parent",
default_args=args,
# catchup=False,
) as dag:
task_parent_1 = PythonOperator(
task_id="task_parent_1",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 1"},
provide_context=True,
)
task_parent_2 = PythonOperator(
task_id="task_parent_2",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 2"},
provide_context=True,
)
task_parent_1 >> task_parent_2
还有我的 child 狗:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_child",
default_args=args,
# catchup=False,
) as dag:
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task',
external_dag_id='test_parent',
external_task_id='task_parent_2',
execution_delta=datetime.timedelta(hours=24),
# execution_date_fn=lambda dt: dt - datetime.timedelta(hours=24),
)
task_child_1 = PythonOperator(
task_id="task_child_1",
python_callable=start_job,
op_kwargs={"process_name": "my child task 1"},
provide_context=True,
)
task_child_2 = PythonOperator(
task_id="task_child_2",
python_callable=start_job,
op_kwargs={"process_name": "my child task 2"},
provide_context=True,
)
task_child_1 >> wait_for_parent_task >> task_child_2
Code-wise 看起来正确,但 start_date 设置为今天。设置 execution_delta
后,ExternalTaskSensor
将检查执行日期为 execution_date - execution_delta
的任务。 IE。第一个 DAG 运行 将于 26 日 00:00 开始,ExternalTaskSensor
将检查 execution_date 25 日 00:00 - 24 小时 = 24 日的任务00:00。由于那是在您的 DAG 开始日期之前,因此不会有针对该 execution_date 的任务。
在日志中您应该看到 DAG/task/date 它正在检查:Poking for tasks %s in dag %s on %s ...
。您可以将 DAG 的开始日期设置为几天前,或者让它 运行 几天来调试问题。
或者,我还找到了一种对我有用的方法,方法是将执行日期设置为父 dag 的预定日期。优点:您也可以手动触发 dag
假设父 dag 安排在时区“Europe/Berlin”的早上 6 点。
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task ',
external_dag_id='test_parent',
external_task_id='task_parent_2',
check_existence=True,
# execution_date needs to be exact (scheduled time) and the london timezone
# Remember: The scheduled start is always the one step further in the past -
# For a daily schedule: - datetime.timedelta(days=1)
execution_date_fn=lambda dt: (datetime.datetime(year=dt.year, month=dt.month, day=dt.day, tzinfo=local_tz)
+ datetime.timedelta(hours=6, minutes=0)
- datetime.timedelta(days=1)
).astimezone(local_tz_london),
)
我需要等待另一个 dag 中的另一个任务,直到我可以触发我自己的任务。但是我的外部传感器并没有停止戳动。我已经在这里阅读了一些其他相关问题,并确保我已经调整了 execution_delta。但是,我仍然有同样的问题。
这是我的两只狗
Parent 达格:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_parent",
default_args=args,
# catchup=False,
) as dag:
task_parent_1 = PythonOperator(
task_id="task_parent_1",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 1"},
provide_context=True,
)
task_parent_2 = PythonOperator(
task_id="task_parent_2",
python_callable=start_job,
op_kwargs={"process_name": "my parent task 2"},
provide_context=True,
)
task_parent_1 >> task_parent_2
还有我的 child 狗:
import datetime
import pendulum
from airflow import models
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
local_tz = pendulum.timezone("Europe/Berlin")
args = {
"start_date": datetime.datetime(2022, 1, 25, tzinfo=local_tz),
"provide_context": True,
}
def start_job(process_name, **kwargs):
print('do something: ' + process_name)
return True
with models.DAG(
dag_id="test_child",
default_args=args,
# catchup=False,
) as dag:
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task',
external_dag_id='test_parent',
external_task_id='task_parent_2',
execution_delta=datetime.timedelta(hours=24),
# execution_date_fn=lambda dt: dt - datetime.timedelta(hours=24),
)
task_child_1 = PythonOperator(
task_id="task_child_1",
python_callable=start_job,
op_kwargs={"process_name": "my child task 1"},
provide_context=True,
)
task_child_2 = PythonOperator(
task_id="task_child_2",
python_callable=start_job,
op_kwargs={"process_name": "my child task 2"},
provide_context=True,
)
task_child_1 >> wait_for_parent_task >> task_child_2
Code-wise 看起来正确,但 start_date 设置为今天。设置 execution_delta
后,ExternalTaskSensor
将检查执行日期为 execution_date - execution_delta
的任务。 IE。第一个 DAG 运行 将于 26 日 00:00 开始,ExternalTaskSensor
将检查 execution_date 25 日 00:00 - 24 小时 = 24 日的任务00:00。由于那是在您的 DAG 开始日期之前,因此不会有针对该 execution_date 的任务。
在日志中您应该看到 DAG/task/date 它正在检查:Poking for tasks %s in dag %s on %s ...
。您可以将 DAG 的开始日期设置为几天前,或者让它 运行 几天来调试问题。
或者,我还找到了一种对我有用的方法,方法是将执行日期设置为父 dag 的预定日期。优点:您也可以手动触发 dag
假设父 dag 安排在时区“Europe/Berlin”的早上 6 点。
wait_for_parent_task = ExternalTaskSensor(
task_id='wait_for_parent_task ',
external_dag_id='test_parent',
external_task_id='task_parent_2',
check_existence=True,
# execution_date needs to be exact (scheduled time) and the london timezone
# Remember: The scheduled start is always the one step further in the past -
# For a daily schedule: - datetime.timedelta(days=1)
execution_date_fn=lambda dt: (datetime.datetime(year=dt.year, month=dt.month, day=dt.day, tzinfo=local_tz)
+ datetime.timedelta(hours=6, minutes=0)
- datetime.timedelta(days=1)
).astimezone(local_tz_london),
)