从 trigger_dag_run_operator 设置任务 ID
Setting task-id from trigger_dag_run_operator
在以下 Airflow 查询中需要帮助:
无法设置使用 trigger_dag_run_operator:
启动的任务 ID
下面是我的触发器 dag 运行 运算符和目标 python 运算符:
TriggerDag 运算符:
...
trigger_target = TriggerDagRunOperator(
task_id='trigger_target',
trigger_dag_id='TargetDag',
conf={"message": "Test Message", "executed_file_name":"DAG_NAME_001"},
)
...
目标 dag 运算符:
trigger_pipeline = PythonOperator(
task_id='called_for_file'+{{dag_run['conf']['executed_file_name']}},
python_callable=call_trigger_pipeline,
)
在上面的代码中,“{{dag_run['conf']['executed_file_name']}}”没有被触发器 dag 运行 运算符中设置的值替换.
谢谢,
杰克
该代码的问题是 task_id
不是模板化字段,因此不会呈现 Jinja,这解释了为什么您得到的输出包括花括号,这是预期的行为。
在不了解更多上下文的情况下,我认为您应该考虑一种不同的设计,其中任务不会动态生成但 DAG
s 会动态生成。按照 Airflow FAQs 中的模式,您可以动态创建 DAG 和进一步的任务,请考虑以下示例:
def create_dags(city_name, payload: list, default_args):
"""
Returns a DAG object
"""
def _print_load_number(city_name, load_number):
print(f"{load_number} from: {city_name} ")
dag = DAG(
f"location_sync_{city_name}",
schedule_interval="@daily",
catchup=False,
tags=["example", "dynamic_dag"],
default_args=default_args,
)
with dag:
end = DummyOperator(task_id="end")
for load_no in payload:
print_load = PythonOperator(
task_id=f"{dag_id}_proccesing_load_{load_no}",
python_callable=_print_load_number,
op_kwargs={"city_name": city, "load_number": load_no},
)
print_load >> end
# DAG level tasks dependencies
return dag
cities = [
{"name": "London", "payload": [1, 2, 3]},
{"name": "Paris", "payload": [4, 5, 6]},
{"name": "Buenos_Aires", "payload": [4, 5, 6]},
]
default_args = {"owner": "Airflow", "start_date": days_ago(1)}
for city in cities:
dag_id = city["name"]
globals()[dag_id] = create_dags(city["name"], city["payload"], default_args)
请注意,在 create_dag
函数中,任务是动态创建的,每个 task_id
都是根据提供的值命名的:task_id=f"{dag_id}_proccesing_load_{load_no}"
一旦你创建了 n 个 DAG,你就可以根据需要处理触发它们,包括使用来自另一个 DAG 的 TriggerDagRunOperator
,这将允许定义(动态) dag_id
被触发。您甚至可以循环创建 DAG 时使用的 iterable
。
此外,由于 trigger_dag_id
是一个模板化字段,如果您需要定义要从 UI 或 CLI 触发的 DAG,您可以在 params
上使用宏,如下所示:
trigger_service_discovery = TriggerDagRunOperator(
task_id='trigger_loc_sync',
trigger_dag_id='location_sync_{}'.format('{{ params.dag_id_from_UI }}'),
wait_for_completion=True,
)
上例中其中一个 DAG 的图形视图:
查看天文学家指南以进一步阅读有关 dynamic DAGs generation 的内容。
在以下 Airflow 查询中需要帮助:
无法设置使用 trigger_dag_run_operator:
启动的任务 ID下面是我的触发器 dag 运行 运算符和目标 python 运算符: TriggerDag 运算符:
...
trigger_target = TriggerDagRunOperator(
task_id='trigger_target',
trigger_dag_id='TargetDag',
conf={"message": "Test Message", "executed_file_name":"DAG_NAME_001"},
)
...
目标 dag 运算符:
trigger_pipeline = PythonOperator(
task_id='called_for_file'+{{dag_run['conf']['executed_file_name']}},
python_callable=call_trigger_pipeline,
)
在上面的代码中,“{{dag_run['conf']['executed_file_name']}}”没有被触发器 dag 运行 运算符中设置的值替换.
谢谢, 杰克
该代码的问题是 task_id
不是模板化字段,因此不会呈现 Jinja,这解释了为什么您得到的输出包括花括号,这是预期的行为。
在不了解更多上下文的情况下,我认为您应该考虑一种不同的设计,其中任务不会动态生成但 DAG
s 会动态生成。按照 Airflow FAQs 中的模式,您可以动态创建 DAG 和进一步的任务,请考虑以下示例:
def create_dags(city_name, payload: list, default_args):
"""
Returns a DAG object
"""
def _print_load_number(city_name, load_number):
print(f"{load_number} from: {city_name} ")
dag = DAG(
f"location_sync_{city_name}",
schedule_interval="@daily",
catchup=False,
tags=["example", "dynamic_dag"],
default_args=default_args,
)
with dag:
end = DummyOperator(task_id="end")
for load_no in payload:
print_load = PythonOperator(
task_id=f"{dag_id}_proccesing_load_{load_no}",
python_callable=_print_load_number,
op_kwargs={"city_name": city, "load_number": load_no},
)
print_load >> end
# DAG level tasks dependencies
return dag
cities = [
{"name": "London", "payload": [1, 2, 3]},
{"name": "Paris", "payload": [4, 5, 6]},
{"name": "Buenos_Aires", "payload": [4, 5, 6]},
]
default_args = {"owner": "Airflow", "start_date": days_ago(1)}
for city in cities:
dag_id = city["name"]
globals()[dag_id] = create_dags(city["name"], city["payload"], default_args)
请注意,在 create_dag
函数中,任务是动态创建的,每个 task_id
都是根据提供的值命名的:task_id=f"{dag_id}_proccesing_load_{load_no}"
一旦你创建了 n 个 DAG,你就可以根据需要处理触发它们,包括使用来自另一个 DAG 的 TriggerDagRunOperator
,这将允许定义(动态) dag_id
被触发。您甚至可以循环创建 DAG 时使用的 iterable
。
此外,由于 trigger_dag_id
是一个模板化字段,如果您需要定义要从 UI 或 CLI 触发的 DAG,您可以在 params
上使用宏,如下所示:
trigger_service_discovery = TriggerDagRunOperator(
task_id='trigger_loc_sync',
trigger_dag_id='location_sync_{}'.format('{{ params.dag_id_from_UI }}'),
wait_for_completion=True,
)
上例中其中一个 DAG 的图形视图:
查看天文学家指南以进一步阅读有关 dynamic DAGs generation 的内容。