Airflow 中的动态任务 ID 名称

dynamic task id names in Airflow

我有一个 DAG,其中一个 DataflowTemplateOperator 可以处理不同的 json 文件。当我触发 dag 时,我通过 {{dag_run.conf['param1']}} 传递了一些参数并且工作正常。

我遇到的问题是尝试根据参数 1 重命名 task_id

task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",

它只抱怨字母数字字符 或者

task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']), 它无法识别 dag_run 加上 alpha 问题。

这背后的整个想法是,当我在数据流作业控制台看到作业失败时,我知道基于 param1 的违规者是谁。数据流作业名称基于 task_id,如下所示:

df-operator-read-object-json-file-8b9eecec

我需要的是:

df-operator-read-object-param1-json-file-8b9eecec

有什么想法可以吗?

无需为每个文件生成新的运算符。 DataflowTemplatedJobStartOperatorjob_name 参数,它也是模板化的,因此可以与 Jinja 一起使用。

我没有测试过,但这应该有效:

from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
        task_id="df_operator_read_object_json_file",
        job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
        template='gs://dataflow-templates/your_template',
        location='europe-west3',
    )