Airflow ExternalTaskSensor 卡住了
Airflow ExternalTaskSensor gets stuck
我正在尝试使用 ExternalTaskSensor,但它卡在了另一个 DAG 的任务上,该任务已经成功完成。
在这里,第一个 DAG "a" 完成了它的任务,然后应该触发通过 ExternalTaskSensor 的第二个 DAG "b"。相反,它会卡在 a.first_task 上。
第一个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
第二个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
我在这里错过了什么?
ExternalTaskSensor
假定您依赖于具有相同执行日期的 dag 运行 中的任务。
这意味着在您的情况下,a
和 b
需要按相同的时间表 运行(例如,每天 9:00am 或 w/e ).
否则在实例化ExternalTaskSensor
时需要使用execution_delta
或execution_date_fn
。
这是运算符本身内部的文档,以帮助进一步阐明:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
为了澄清我在这里和其他相关问题上看到的内容,dags 不一定要 运行 在相同的时间表上,如已接受的答案中所述。 dags 也不需要具有相同的 start_date
。如果您在没有 execution_delta
或 execution_date_fn
的情况下创建 ExternalTaskSensor
任务,则这两个 dag 需要具有相同的 执行日期 。碰巧的是,如果两个 dag 具有相同的调度,则每个间隔中调度的 运行s 将具有相同的执行日期。我不确定手动触发的 运行 计划 dag 的执行日期是什么。
要使此示例正常工作,dag b
的 ExternalTaskSensor
任务需要一个 execution_delta
或 execution_date_fn
参数。如果使用 execution_delta
参数,它应该是 b
的执行日期 - execution_delta
= a
的执行日期。如果使用 execution_date_fn
,那么该函数应该 return a
的执行日期。
如果您使用 TriggerDagRunOperator
,然后使用 ExternalTaskSensor
来检测该 dag 何时完成,您可以执行一些操作,例如将主 dag 的执行日期传递给触发日期 TriggerDagRunOperator
的 execution_date
参数,如 execution_date='{{ execution_date }}'
。然后两个 dag 的执行日期将相同,并且您不需要每个 dag 的计划都相同,或者使用 execution_delta
或 execution_date_fn
传感器参数。
以上内容是在 Airflow 1.10.9 上编写和测试的
从 Airflow v1.10.7 开始,tomcm 的回答是不正确的(至少对于这个版本)。如果外部 DAG 的时间表不同,则应使用 execution_delta
或 execution_date_fn
来确定日期和时间表。
来自我的成功案例:
default_args = {
'owner': 'xx',
'retries': 2,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'retry_delay': timedelta(seconds=30),
# avoid stopping tasks after one day
'depends_on_past': False,
}
dag = DAG(
dag_id = dag_id,
# get the datetime type value
start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),
description = 'xxx',
default_args = default_args,
schedule_interval = timedelta(hours=1),
)
...
external_sensor= ExternalTaskSensor(
task_id='ext_sensor_task_update_model',
external_dag_id='xxx',
external_task_id='xxx'.format(log_type),
# set the task_id to None because of the end_task
# external_task_id = None,
dag=dag,
timeout = 300,
)
...
您可以等到任务成功自动触发。不要手动操作,start_date 会有所不同。
Airflow 默认查找相同的执行日期、时间戳。如果我们使用 execution_date_fn 参数,我们必须 return 要查找的时间戳值列表。在内部,传感器将查询气流的 task_instance table 以检查作为参数提供的 dagid、taskid、状态和执行日期时间戳的 dag 运行。因此,如果我们使用 None 计划,则必须手动触发 dag,在这种情况下,日期时间戳可能是任何可能的值。
我在这里详细解释了它:
https://link.medium.com/QzXm21asokb
我创建了一个继承了 ExternalTaskSensor 的新传感器,它可用于监视具有 None 计划的 dag。您可以在下面的仓库中找到代码。
https://github.com/Deepaksai1919/AirflowTaskSensor
我也 运行 参与其中,但在我的情况下,两个 DAG 使用相同的 schedule_interval
,因此上述 none 的建议有所帮助。
原来这是一个 Airflow 错误。 external_task_id
/external_task_ids
字段中的模板目前在 v2.2.4 中已损坏:https://github.com/apache/airflow/issues/22782
我正在尝试使用 ExternalTaskSensor,但它卡在了另一个 DAG 的任务上,该任务已经成功完成。
在这里,第一个 DAG "a" 完成了它的任务,然后应该触发通过 ExternalTaskSensor 的第二个 DAG "b"。相反,它会卡在 a.first_task 上。
第一个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
dag = DAG(
dag_id='a',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_first_task():
print('First task is done')
PythonOperator(
task_id='first_task',
python_callable=do_first_task,
dag=dag)
第二个 DAG:
import datetime
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import ExternalTaskSensor
dag = DAG(
dag_id='b',
default_args={'owner': 'airflow', 'start_date': datetime.datetime.now()},
schedule_interval=None
)
def do_second_task():
print('Second task is done')
ExternalTaskSensor(
task_id='wait_for_the_first_task_to_be_completed',
external_dag_id='a',
external_task_id='first_task',
dag=dag) >> \
PythonOperator(
task_id='second_task',
python_callable=do_second_task,
dag=dag)
我在这里错过了什么?
ExternalTaskSensor
假定您依赖于具有相同执行日期的 dag 运行 中的任务。
这意味着在您的情况下,a
和 b
需要按相同的时间表 运行(例如,每天 9:00am 或 w/e ).
否则在实例化ExternalTaskSensor
时需要使用execution_delta
或execution_date_fn
。
这是运算符本身内部的文档,以帮助进一步阐明:
:param execution_delta: time difference with the previous execution to
look at, the default is the same execution_date as the current task.
For yesterday, use [positive!] datetime.timedelta(days=1). Either
execution_delta or execution_date_fn can be passed to
ExternalTaskSensor, but not both.
:type execution_delta: datetime.timedelta
:param execution_date_fn: function that receives the current execution date
and returns the desired execution date to query. Either execution_delta
or execution_date_fn can be passed to ExternalTaskSensor, but not both.
:type execution_date_fn: callable
为了澄清我在这里和其他相关问题上看到的内容,dags 不一定要 运行 在相同的时间表上,如已接受的答案中所述。 dags 也不需要具有相同的 start_date
。如果您在没有 execution_delta
或 execution_date_fn
的情况下创建 ExternalTaskSensor
任务,则这两个 dag 需要具有相同的 执行日期 。碰巧的是,如果两个 dag 具有相同的调度,则每个间隔中调度的 运行s 将具有相同的执行日期。我不确定手动触发的 运行 计划 dag 的执行日期是什么。
要使此示例正常工作,dag b
的 ExternalTaskSensor
任务需要一个 execution_delta
或 execution_date_fn
参数。如果使用 execution_delta
参数,它应该是 b
的执行日期 - execution_delta
= a
的执行日期。如果使用 execution_date_fn
,那么该函数应该 return a
的执行日期。
如果您使用 TriggerDagRunOperator
,然后使用 ExternalTaskSensor
来检测该 dag 何时完成,您可以执行一些操作,例如将主 dag 的执行日期传递给触发日期 TriggerDagRunOperator
的 execution_date
参数,如 execution_date='{{ execution_date }}'
。然后两个 dag 的执行日期将相同,并且您不需要每个 dag 的计划都相同,或者使用 execution_delta
或 execution_date_fn
传感器参数。
以上内容是在 Airflow 1.10.9 上编写和测试的
从 Airflow v1.10.7 开始,tomcm 的回答是不正确的(至少对于这个版本)。如果外部 DAG 的时间表不同,则应使用 execution_delta
或 execution_date_fn
来确定日期和时间表。
来自我的成功案例:
default_args = {
'owner': 'xx',
'retries': 2,
'email': ALERT_EMAIL_ADDRESSES,
'email_on_failure': True,
'email_on_retry': False,
'retry_delay': timedelta(seconds=30),
# avoid stopping tasks after one day
'depends_on_past': False,
}
dag = DAG(
dag_id = dag_id,
# get the datetime type value
start_date = pendulum.strptime(current_date, "%Y, %m, %d, %H").astimezone('Europe/London').subtract(hours=1),
description = 'xxx',
default_args = default_args,
schedule_interval = timedelta(hours=1),
)
...
external_sensor= ExternalTaskSensor(
task_id='ext_sensor_task_update_model',
external_dag_id='xxx',
external_task_id='xxx'.format(log_type),
# set the task_id to None because of the end_task
# external_task_id = None,
dag=dag,
timeout = 300,
)
...
您可以等到任务成功自动触发。不要手动操作,start_date 会有所不同。
Airflow 默认查找相同的执行日期、时间戳。如果我们使用 execution_date_fn 参数,我们必须 return 要查找的时间戳值列表。在内部,传感器将查询气流的 task_instance table 以检查作为参数提供的 dagid、taskid、状态和执行日期时间戳的 dag 运行。因此,如果我们使用 None 计划,则必须手动触发 dag,在这种情况下,日期时间戳可能是任何可能的值。 我在这里详细解释了它: https://link.medium.com/QzXm21asokb
我创建了一个继承了 ExternalTaskSensor 的新传感器,它可用于监视具有 None 计划的 dag。您可以在下面的仓库中找到代码。 https://github.com/Deepaksai1919/AirflowTaskSensor
我也 运行 参与其中,但在我的情况下,两个 DAG 使用相同的 schedule_interval
,因此上述 none 的建议有所帮助。
原来这是一个 Airflow 错误。 external_task_id
/external_task_ids
字段中的模板目前在 v2.2.4 中已损坏:https://github.com/apache/airflow/issues/22782