如何将 ds 变量传递给 dag 中的函数?

How can I pass ds variable to a function within a dag?

我想传递执行日期,它在变量 {{ ds }} 中。但是,我通过它没有获得执行日期的函数传递它。

def get_spark_step_2(date):
      #logic in here
      return step

exec_date = '{{ ds }}'

step_adder2 = EmrAddStepsOperator(
    task_id='create_parquets',
    job_flow_id="{{ task_instance.xcom_pull('create_job_flow', key='return_value') }}",
    aws_conn_id='aws_default',
    steps=get_spark_step_2(exec_date),
    dag=dag
)

你知道我如何在上面的上下文中使用变量吗?

创建一个扩展 EmrAddStepsOperator 的 class,并使 steps 成为模板化字段。

像这样:

class MyEmrAddStepsOperator(EmrAddStepsOperator):

    template_fields = ['job_flow_id','steps']

EmrAddStepsOperator 本身只有 job_flow_id 作为模板字段:

class EmrAddStepsOperator(BaseOperator):
    """
    An operator that adds steps to an existing EMR job_flow.
    :param job_flow_id: id of the JobFlow to add steps to
    :type job_flow_name: str
    :param aws_conn_id: aws connection to uses
    :type aws_conn_id: str
    :param steps: boto3 style steps to be added to the jobflow
    :type steps: list
    """
    template_fields = ['job_flow_id']

您只能在模板化的字段中使用宏(如 ds)。