如何使用 ExternalTaskSensor 在气流中设置两个 DAG?
how set two DAGs in airflow using ExternalTaskSensor?
我有两个 DAG:
DAG_CPS
dag = DAG(
'DAG_CPS',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
max_active_runs=1
)
tmp1_cap_pes_sap = PostgresOperatorWithTemplatedParams(
task_id='tmp1_cap_pes_sap',
sql='./SQL/A2050.sql',
postgres_conn_id='xxxx',
dag=dag)
...
DAG_SAS
dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
max_active_runs=1
)
wait_for_DAG_CPS = ExternalTaskSensor(
task_id='wait_for_DAG_CPS',
external_dag_id='DAG_CPS',
external_task_id='tmp1_cap_pes_sap',
execution_delta=None,
execution_date_fn=None,
dag=dag)
我从网络手动触发两个 DAG,任务 tmp1_cap_pes_sap 结束正常
Attribute Value
dag_id DAG_CPS
duration None
end_date 2018-08-24 11:04:28.177221
execution_date 2018-08-24 11:04:18.113031
但在 DAG_SAS 我获得了下一个日志,但它从未启动
[2018-08-24 11:03:55,592] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:03:55,592] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:04:55,642] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:04:55,641] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:05:55,718] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:05:55,717] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:06:55,799] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:06:55,797] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:07:55,853] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:07:55,853] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
我的代码有什么问题?
已解决
感谢@Alessandro Cosentino 对我的帮助。这是修复后的代码,如果我手动启动DAG,基本上就不会工作
DAG_CPS
default_args = {
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'DAG_CPS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)
DAG_SAS
dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)
由于您手动触发任务,它们将 运行 不同 execution_date
,这就是 ExternalTaskSensor 未检测到第一个 DAG 任务完成的原因。
尝试 运行 他们按相同的时间表进行,看看是否可行。
我认为这就是问题所在,因为 execution_delta
和 execution_date_fn
参数的存在实际上是为了同步两个 DAG。有关这两个参数的行为,请参阅 the docs。
我有两个 DAG:
DAG_CPS
dag = DAG(
'DAG_CPS',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
max_active_runs=1
)
tmp1_cap_pes_sap = PostgresOperatorWithTemplatedParams(
task_id='tmp1_cap_pes_sap',
sql='./SQL/A2050.sql',
postgres_conn_id='xxxx',
dag=dag)
...
DAG_SAS
dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(hours=2),
schedule_interval=None,
max_active_runs=1
)
wait_for_DAG_CPS = ExternalTaskSensor(
task_id='wait_for_DAG_CPS',
external_dag_id='DAG_CPS',
external_task_id='tmp1_cap_pes_sap',
execution_delta=None,
execution_date_fn=None,
dag=dag)
我从网络手动触发两个 DAG,任务 tmp1_cap_pes_sap 结束正常
Attribute Value
dag_id DAG_CPS
duration None
end_date 2018-08-24 11:04:28.177221
execution_date 2018-08-24 11:04:18.113031
但在 DAG_SAS 我获得了下一个日志,但它从未启动
[2018-08-24 11:03:55,592] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:03:55,592] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:04:55,642] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:04:55,641] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:05:55,718] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:05:55,717] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:06:55,799] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:06:55,797] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
[2018-08-24 11:07:55,853] {base_task_runner.py:98} INFO - Subtask: [2018-08-24 11:07:55,853] {sensors.py:243} INFO - Poking for DAG_CPS.tmp1_cap_pes_sap on 2018-08-24T11:03:50.518595 ...
我的代码有什么问题?
已解决
感谢@Alessandro Cosentino 对我的帮助。这是修复后的代码,如果我手动启动DAG,基本上就不会工作
DAG_CPS
default_args = {
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'retries': 2,
'retry_delay': timedelta(minutes=1)
}
dag = DAG(
'DAG_CPS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)
DAG_SAS
dag = DAG(
'DAG_SAS',
default_args=default_args,
dagrun_timeout=timedelta(minutes=5),
schedule_interval='*/10 * * * *',
max_active_runs=1
)
由于您手动触发任务,它们将 运行 不同 execution_date
,这就是 ExternalTaskSensor 未检测到第一个 DAG 任务完成的原因。
尝试 运行 他们按相同的时间表进行,看看是否可行。
我认为这就是问题所在,因为 execution_delta
和 execution_date_fn
参数的存在实际上是为了同步两个 DAG。有关这两个参数的行为,请参阅 the docs。