在评估 Airflow 中的模板后如何 return 对象?
How to return Object after evaluating the Templates in Airflow?
我们正在设计一个变量选择和参数 setter 逻辑,当 DAG 被触发时需要评估什么。我们的 DAG 是在执行之前生成的。我们决定将静态代码修改为自定义宏。
直到此时有一个代码定义在操作符定义之间,因此DAG生成器代码生成DAG时是运行。此代码无法处理用于选择正确 Airflow 变量的运行时参数。
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):
with DAG(
default_args=default_dag_args,
dag_id=dag_id,
schedule_interval=schedule,
user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution now
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
现在我们想传递 load_type
(例如:INITIAL
,INCREMENTAL
),同时从 UI 或 REST API 触发 DAG ,因此我们需要修改这个旧的(静态)行为(它只处理一种情况,而不是两种情况)以获得正确的气流变量并为我们的 CloudDataFusionStartPipelineOperator
:
创建正确的对象
例如:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
但是如果我们这样做:
def create_dag(dag_id, schedule, default_dag_args):
def extend_runtime_args(prefix, param, field_name, date, job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
注意事项:
这里我们需要的是自定义逻辑的“未来”评估(未在 DAG 生成时间内评估),它将 return 与对象一起使用,这就是我们在这里使用模板的原因。
我们遇到以下情况:
- 在自定义宏函数中
extend_runtime_args
return 类型是一个对象
- 在评估 Jinja 模板后,return 类型更改为字符串
CloudDataFusionStartPipelineOperator
失败,因为 runtime_args
属性 是字符串而不是对象
问题:
- 我们如何在评估 Jinja 模板后 return 使用对象(并在 'future' 中执行此操作)?
- 我们能以某种方式转换字符串吗?
- 如何保证这里的逻辑是在DAG执行完之后才执行,而不是在DAG生成之后才执行呢?
- 这里的 Jinja 模板/自定义宏模式在处理触发时间参数方面是好是坏?
How could we return with an object after evaluating the Jinja template
(and do this in the 'future')?
您可以创建自己的派生自 CloudDataFusionStartPipelineOperator
的自定义运算符并使其接受字符串并将其转换为 CloudDataFusionStartPipelineOperator
所需的对象并使用此新运算符。 “runtime_args”是一个字典,所以我相信它应该像 json.loads()
一样容易找回来。
Can we convert the string somehow?
是的。上面的 json.loads() 代码应该可以。此外,如果您在 runtime_args 中只有几个参数可以更改,那么拥有多个宏并直接 return 字典中多个 JINJA 字符串中更改的值可能会更容易。类似于:
runtime_args = {
'PREFIX' = "{{ dag_run }}",
'date' = "{{ macros.ds_format(....) }}",
}
当存在模板化字段时,Airflow 会递归地处理字典或列表等基本结构,因此您可以保留对象结构,并使用 jinja 宏作为值(实际上您也可以将 jinja 宏作为键等)。
How could we ensure that the logic here will be executed after the DAG is
executed and not right after the DAG was generated?
JINJA 模板仅在执行任务时进行评估。所以你在这里很好。
Are the Jinja templates / custom macros good or bad patterns here for
handling the trigger-time arguments?
很好的模式。这就是他们的目的。
我们正在设计一个变量选择和参数 setter 逻辑,当 DAG 被触发时需要评估什么。我们的 DAG 是在执行之前生成的。我们决定将静态代码修改为自定义宏。
直到此时有一个代码定义在操作符定义之间,因此DAG生成器代码生成DAG时是运行。此代码无法处理用于选择正确 Airflow 变量的运行时参数。
for table_name in ast.literal_eval(Variable.get('PYTHON_LIST_OF_TABLES')):
dag_id = "TableLoader_" + str(table_name)
default_dag_args={...}
schedule = None
globals()[dag_id] = create_dag(dag_id, schedule, default_dag_args)
def create_dag(dag_id, schedule, default_dag_args):
with DAG(
default_args=default_dag_args,
dag_id=dag_id,
schedule_interval=schedule,
user_defined_macros={ "load_type_handler": load_type_handler }
) as dag:
# static python code which sets pipeline_rt_args for all generated DAGs the same way
# this static code could set only one type (INITIAL or INCREMENTAL)
# but we want to decide during the execution now
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> Can't handle runtime parameters <---
runtime_args=pipeline_rt_args,
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
现在我们想传递 load_type
(例如:INITIAL
,INCREMENTAL
),同时从 UI 或 REST API 触发 DAG ,因此我们需要修改这个旧的(静态)行为(它只处理一种情况,而不是两种情况)以获得正确的气流变量并为我们的 CloudDataFusionStartPipelineOperator
:
例如:
{"load_type":"INCREMENTAL"}
# or
{"load_type":"INITIAL"}
但是如果我们这样做:
def create_dag(dag_id, schedule, default_dag_args):
def extend_runtime_args(prefix, param, field_name, date, job_id):
# reading the Trigger-time parameter
load_type = param.conf["load_type"]
# getting the proper Airflow Variable (depending on current load type)
result = eval(Variable.get(prefix+'_'+load_type+'_'+dag_id))[field_name]
# setting 'job_id', 'dateValue', 'date', 'GCS_Input_Path' for CloudDataFusionStartPipelineOperator
# ...
return rt_args
with DAG( #...
user_defined_macros={
"extend_runtime_args": extend_runtime_args
}) as dag:
# removed static code (which executes only in generation time)
# Operator Definitions
OP_K = CloudDataFusionStartPipelineOperator(
task_id='PREFIX_'+str(table),
# ---> handles runtime arguments with custom macro <---
runtime_args="""{{ extend_runtime_args('PREFIX', dag_run, 'runtime_args', macros.ds_format(yesterday_ds_nodash,"%Y%m%d","%Y_%m_%d"), ti.job_id) }}""",
# ...
)
OP_1 >> OP_2 >> ... >> OP_K >> ... >> OP_N
return dag
注意事项:
这里我们需要的是自定义逻辑的“未来”评估(未在 DAG 生成时间内评估),它将 return 与对象一起使用,这就是我们在这里使用模板的原因。
我们遇到以下情况:
- 在自定义宏函数中
extend_runtime_args
return 类型是一个对象 - 在评估 Jinja 模板后,return 类型更改为字符串
CloudDataFusionStartPipelineOperator
失败,因为runtime_args
属性 是字符串而不是对象
问题:
- 我们如何在评估 Jinja 模板后 return 使用对象(并在 'future' 中执行此操作)?
- 我们能以某种方式转换字符串吗?
- 如何保证这里的逻辑是在DAG执行完之后才执行,而不是在DAG生成之后才执行呢?
- 这里的 Jinja 模板/自定义宏模式在处理触发时间参数方面是好是坏?
How could we return with an object after evaluating the Jinja template (and do this in the 'future')?
您可以创建自己的派生自 CloudDataFusionStartPipelineOperator
的自定义运算符并使其接受字符串并将其转换为 CloudDataFusionStartPipelineOperator
所需的对象并使用此新运算符。 “runtime_args”是一个字典,所以我相信它应该像 json.loads()
一样容易找回来。
Can we convert the string somehow?
是的。上面的 json.loads() 代码应该可以。此外,如果您在 runtime_args 中只有几个参数可以更改,那么拥有多个宏并直接 return 字典中多个 JINJA 字符串中更改的值可能会更容易。类似于:
runtime_args = {
'PREFIX' = "{{ dag_run }}",
'date' = "{{ macros.ds_format(....) }}",
}
当存在模板化字段时,Airflow 会递归地处理字典或列表等基本结构,因此您可以保留对象结构,并使用 jinja 宏作为值(实际上您也可以将 jinja 宏作为键等)。
How could we ensure that the logic here will be executed after the DAG is executed and not right after the DAG was generated?
JINJA 模板仅在执行任务时进行评估。所以你在这里很好。
Are the Jinja templates / custom macros good or bad patterns here for handling the trigger-time arguments?
很好的模式。这就是他们的目的。