从 trigger_dag_run_operator 设置任务 ID

Setting task-id from trigger_dag_run_operator

在以下 Airflow 查询中需要帮助:

无法设置使用 trigger_dag_run_operator:

启动的任务 ID

下面是我的触发器 dag 运行 运算符和目标 python 运算符: TriggerDag 运算符:

    ...
    trigger_target = TriggerDagRunOperator(
        task_id='trigger_target',
        trigger_dag_id='TargetDag',
        conf={"message": "Test Message", "executed_file_name":"DAG_NAME_001"},
    )
    ...

目标 dag 运算符:

    trigger_pipeline = PythonOperator(
        task_id='called_for_file'+{{dag_run['conf']['executed_file_name']}},
        python_callable=call_trigger_pipeline,
    )

在上面的代码中,“{{dag_run['conf']['executed_file_name']}}”没有被触发器 dag 运行 运算符中设置的值替换.

谢谢, 杰克

该代码的问题是 task_id 不是模板化字段,因此不会呈现 Jinja,这解释了为什么您得到的输出包括花括号,这是预期的行为。

在不了解更多上下文的情况下,我认为您应该考虑一种不同的设计,其中任务不会动态生成但 DAGs 会动态生成。按照 Airflow FAQs 中的模式,您可以动态创建 DAG 和进一步的任务,请考虑以下示例:

def create_dags(city_name, payload: list, default_args):
    """
    Returns a DAG object
    """

    def _print_load_number(city_name, load_number):
        print(f"{load_number} from: {city_name} ")

    dag = DAG(
        f"location_sync_{city_name}",
        schedule_interval="@daily",
        catchup=False,
        tags=["example", "dynamic_dag"],
        default_args=default_args,
    )

    with dag:
        end = DummyOperator(task_id="end")
        for load_no in payload:
            print_load = PythonOperator(
                task_id=f"{dag_id}_proccesing_load_{load_no}",
                python_callable=_print_load_number,
                op_kwargs={"city_name": city, "load_number": load_no},
            )
            print_load >> end

    # DAG level tasks dependencies
    return dag

cities = [
    {"name": "London", "payload": [1, 2, 3]},
    {"name": "Paris", "payload": [4, 5, 6]},
    {"name": "Buenos_Aires", "payload": [4, 5, 6]},
]

default_args = {"owner": "Airflow", "start_date": days_ago(1)}

for city in cities:
    dag_id = city["name"]

    globals()[dag_id] = create_dags(city["name"], city["payload"], default_args)

请注意,在 create_dag 函数中,任务是动态创建的,每个 task_id 都是根据提供的值命名的:task_id=f"{dag_id}_proccesing_load_{load_no}"

一旦你创建了 n 个 DAG,你就可以根据需要处理触发它们,包括使用来自另一个 DAG 的 TriggerDagRunOperator,这将允许定义(动态) dag_id 被触发。您甚至可以循环创建 DAG 时使用的 iterable

此外,由于 trigger_dag_id 是一个模板化字段,如果您需要定义要从 UI 或 CLI 触发的 DAG,您可以在 params 上使用宏,如下所示:

trigger_service_discovery = TriggerDagRunOperator(
    task_id='trigger_loc_sync',
    trigger_dag_id='location_sync_{}'.format('{{ params.dag_id_from_UI }}'),
    wait_for_completion=True,
)

上例中其中一个 DAG 的图形视图:

查看天文学家指南以进一步阅读有关 dynamic DAGs generation 的内容。