如何将 ds 变量传递给 dag 中的函数?
How can I pass ds variable to a function within a dag?
我想传递执行日期,它在变量 {{ ds }} 中。但是,我通过它没有获得执行日期的函数传递它。
def get_spark_step_2(date):
#logic in here
return step
exec_date = '{{ ds }}'
step_adder2 = EmrAddStepsOperator(
task_id='create_parquets',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_spark_step_2(exec_date),
dag=dag
)
你知道我如何在上面的上下文中使用变量吗?
创建一个扩展 EmrAddStepsOperator 的 class,并使 steps
成为模板化字段。
像这样:
class MyEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id','steps']
EmrAddStepsOperator
本身只有 job_flow_id
作为模板字段:
class EmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to
:type job_flow_name: str
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps to be added to the jobflow
:type steps: list
"""
template_fields = ['job_flow_id']
您只能在模板化的字段中使用宏(如 ds
)。
我想传递执行日期,它在变量 {{ ds }} 中。但是,我通过它没有获得执行日期的函数传递它。
def get_spark_step_2(date):
#logic in here
return step
exec_date = '{{ ds }}'
step_adder2 = EmrAddStepsOperator(
task_id='create_parquets',
job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
aws_conn_id='aws_default',
steps=get_spark_step_2(exec_date),
dag=dag
)
你知道我如何在上面的上下文中使用变量吗?
创建一个扩展 EmrAddStepsOperator 的 class,并使 steps
成为模板化字段。
像这样:
class MyEmrAddStepsOperator(EmrAddStepsOperator):
template_fields = ['job_flow_id','steps']
EmrAddStepsOperator
本身只有 job_flow_id
作为模板字段:
class EmrAddStepsOperator(BaseOperator):
"""
An operator that adds steps to an existing EMR job_flow.
:param job_flow_id: id of the JobFlow to add steps to
:type job_flow_name: str
:param aws_conn_id: aws connection to uses
:type aws_conn_id: str
:param steps: boto3 style steps to be added to the jobflow
:type steps: list
"""
template_fields = ['job_flow_id']
您只能在模板化的字段中使用宏(如 ds
)。