Airflow 中的动态任务 ID 名称
dynamic task id names in Airflow
我有一个 DAG,其中一个 DataflowTemplateOperator
可以处理不同的 json 文件。当我触发 dag 时,我通过 {{dag_run.conf['param1']}}
传递了一些参数并且工作正常。
我遇到的问题是尝试根据参数 1 重命名 task_id
。
即task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",
它只抱怨字母数字字符
或者
task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']),
它无法识别 dag_run 加上 alpha 问题。
这背后的整个想法是,当我在数据流作业控制台看到作业失败时,我知道基于 param1 的违规者是谁。数据流作业名称基于 task_id,如下所示:
df-operator-read-object-json-file-8b9eecec
我需要的是:
df-operator-read-object-param1-json-file-8b9eecec
有什么想法可以吗?
无需为每个文件生成新的运算符。
DataflowTemplatedJobStartOperator
有 job_name
参数,它也是模板化的,因此可以与 Jinja 一起使用。
我没有测试过,但这应该有效:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
task_id="df_operator_read_object_json_file",
job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
template='gs://dataflow-templates/your_template',
location='europe-west3',
)
我有一个 DAG,其中一个 DataflowTemplateOperator
可以处理不同的 json 文件。当我触发 dag 时,我通过 {{dag_run.conf['param1']}}
传递了一些参数并且工作正常。
我遇到的问题是尝试根据参数 1 重命名 task_id
。
即task_id="df_operator_read_object_json_file_{{dag_run.conf['param1']}}",
它只抱怨字母数字字符 或者
task_id="df_operator_read_object_json_file_{}".format(dag_run.conf['param1']),
它无法识别 dag_run 加上 alpha 问题。
这背后的整个想法是,当我在数据流作业控制台看到作业失败时,我知道基于 param1 的违规者是谁。数据流作业名称基于 task_id,如下所示:
df-operator-read-object-json-file-8b9eecec
我需要的是:
df-operator-read-object-param1-json-file-8b9eecec
有什么想法可以吗?
无需为每个文件生成新的运算符。
DataflowTemplatedJobStartOperator
有 job_name
参数,它也是模板化的,因此可以与 Jinja 一起使用。
我没有测试过,但这应该有效:
from airflow.providers.google.cloud.operators.dataflow import DataflowTemplatedJobStartOperator
op = DataflowTemplatedJobStartOperator(
task_id="df_operator_read_object_json_file",
job_name= "df_operator_read_object_json_file_{{dag_run.conf['param1']}}"
template='gs://dataflow-templates/your_template',
location='europe-west3',
)