在评估 Airflow 中的模板后如何 return 对象?

How to return Object after evaluating the Templates in Airflow?

我们正在设计一个变量选择和参数 setter 逻辑,当 DAG 被触发时需要评估什么。我们的 DAG 是在执行之前生成的。我们决定将静态代码修改为自定义宏。


直到此时有一个代码定义在操作符定义之间,因此DAG生成器代码生成DAG时是运行。此代码无法处理用于选择正确 Airflow 变量的运行时参数。

for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
    dag_id = "TableLoader_" + str(table_name)
    default_dag_args={...}
    schedule = None
    globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):

    with DAG(
        default_args=default_dag_args,
        dag_id=dag_id,
        schedule_interval=schedule,
        user_defined_macros={ "load_type_handler": load_type_handler }
    ) as dag:

        # static python code which sets pipeline_rt_args for all generated DAGs the same way
        # this static code could set only one type (INITIAL or INCREMENTAL)
        # but we want to decide during the execution now

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),

            # ---> Can't handle runtime parameters <---
            runtime_args=pipeline_rt_args,                             
            # ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

现在我们想传递 load_type(例如:INITIALINCREMENTAL),同时从 UI 或 REST API 触发 DAG ,因此我们需要修改这个旧的(静态)行为(它只处理一种情况,而不是两种情况)以获得正确的气流变量并为我们的 CloudDataFusionStartPipelineOperator:

创建正确的对象

例如:

{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}

但是如果我们这样做:


def create_dag(dag_id, schedule, default_dag_args):

    def extend_runtime_args(prefix, param, field_name, date, job_id):

        # reading the Trigger-time parameter
        load_type = param.conf["load_type"]
        
        # getting the proper Airflow Variable (depending on current load type)
        result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]

        # setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
        # ...

        return rt_args


    with DAG( #...
        user_defined_macros={
            "extend_runtime_args": extend_runtime_args
        }) as dag:
        # removed static code (which executes only in generation time)

        # Operator Definitions
        OP_K = CloudDataFusionStartPipelineOperator(
            task_id='PREFIX_'+str(table),

            # ---> handles runtime arguments with custom macro <---
            runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
            # ...
        )

        OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
        return dag

注意事项:

这里我们需要的是自定义逻辑的“未来”评估(未在 DAG 生成时间内评估),它将 return 与对象一起使用,这就是我们在这里使用模板的原因。

我们遇到以下情况:

问题:

How could we return with an object after evaluating the Jinja template (and do this in the 'future')?

您可以创建自己的派生自 CloudDataFusionStartPipelineOperator 的自定义运算符并使其接受字符串并将其转换为 CloudDataFusionStartPipelineOperator 所需的对象并使用此新运算符。 “runtime_args”是一个字典,所以我相信它应该像 json.loads() 一样容易找回来。

Can we convert the string somehow?

是的。上面的 json.loads() 代码应该可以。此外,如果您在 runtime_args 中只有几个参数可以更改,那么拥有多个宏并直接 return 字典中多个 JINJA 字符串中更改的值可能会更容易。类似于:

runtime_args = {
   'PREFIX' = "{{ dag_run }}",
   'date' = "{{ macros.ds_format(....) }}",
}

当存在模板化字段时,Airflow 会递归地处理字典或列表等基本结构,因此您可以保留对象结构,并使用 jinja 宏作为值(实际上您也可以将 jinja 宏作为键等)。

How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?

JINJA 模板仅在执行任务时进行评估。所以你在这里很好。

Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?

很好的模式。这就是他们的目的。