ariflow2:如何将 ds_nodash 宏呈现为 YYYY/MM

ariflow2: How to render ds_nodash macro as YYYY/MM

我目前正在将作业从 Airflow 1.10.14 迁移到 2.1.4

在 airflow2 中,我使用运算符 BeamRunPythonPipelineOperator,其中一项要求是按照以下模式将数据存储在 GCS 中:gs://datalate/data_source/YYYY/MM/model.

    partition_sessions_unlimited = BeamRunPythonPipelineOperator(
        task_id="partition_sessions_unlimited",
        dag=aggregation_dag,
        py_file=os.path.join(
            BEAM_SRC_DIR,
            "streaming_sessions",
            "streaming_session_aggregation_pipeline.py",
        ),
        runner="DataflowRunner",
        dataflow_config=DataflowConfiguration(
            job_name="%s_partition_sessions_unlimited" % ds_env,
            project_id=GCP_PROJECT_ID,
            location="us-central1",
        ),
        pipeline_options={
            "temp_location": "gs://dataflow-temp/{}/{}/amazon_sessions/amz_unlimited".format(
                sch_date, ds_env
            ),
            "staging_location": "gs://dataflow-staging/{}/{}/amazon_sessions/amz_unlimited".format(
                sch_date, ds_env
            ),
            "disk_size_gb": "100",
            "num_workers": "10",
            "num_max_workers": "25",
            "worker_machine_type": "n1-highcpu-64",
            "setup_file": os.path.join(
                BEAM_SRC_DIR, "streaming_sessions", "setup.py"
            ),
            "input": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/input/listens_*".format(
                w_datalake,
            ),
            "output": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/output/sessions_".format(
                w_datalake
            ),
        },
    )

然而,我得到

'output': 'gs://datalake/amazon_sessions/{ ds_nodash[:4] }/{ ds_nodash[4:6] }/amz_prime/output/sessions_',

而不是

'output': 'gs://datalake/amazon_sessions/2022/02/amz_prime/output/sessions_',

我怎样才能做到这一点?

首先,您正在为 jinja 模板化字段使用 format 字符串。

format() 会将 {var} 替换为传递的参数值(如果存在)。

"gs://{}/.../{{ ds_nodash[:4] }}...".format(w_datalake)

首先 {} 替换为“datalake”,第二部分没有传递任何等效参数,因此导致文字“ds_nodash[:4]”。

"gs://datalake/.../{ds_nodash[:4]}..."

为了在格式化字符串中使用 jinja 模板,您可以转义 {} 部分您打算从 jinja 获取价值。要转义 {,您可以添加另一个 {,而对于 },您可以添加另一个 }。原来的有 2 {{ 所以像这样在每一边加 2 {;

"gs://{}/.../{{{{ ds_nodash[:4] }}}}...".format(w_datalake)

这样,先应用format(替换值,去掉转义符),把这个字符串变成

gs://datalake/.../{{ ds_nodash[:4] }}...

然后这个字符串被传递给 BeamRunPythonPipelineOperator ,这部分用 jinja 字段转换。

其次,你可以使用execution_date来随意格式化,而不是使用ds_nodash两次切片

{{ execution_date.strftime('%Y/%m') }}