ariflow2:如何将 ds_nodash 宏呈现为 YYYY/MM
ariflow2: How to render ds_nodash macro as YYYY/MM
我目前正在将作业从 Airflow 1.10.14 迁移到 2.1.4
在 airflow2 中,我使用运算符 BeamRunPythonPipelineOperator
,其中一项要求是按照以下模式将数据存储在 GCS 中:gs://datalate/data_source/YYYY/MM/model.
partition_sessions_unlimited = BeamRunPythonPipelineOperator(
task_id="partition_sessions_unlimited",
dag=aggregation_dag,
py_file=os.path.join(
BEAM_SRC_DIR,
"streaming_sessions",
"streaming_session_aggregation_pipeline.py",
),
runner="DataflowRunner",
dataflow_config=DataflowConfiguration(
job_name="%s_partition_sessions_unlimited" % ds_env,
project_id=GCP_PROJECT_ID,
location="us-central1",
),
pipeline_options={
"temp_location": "gs://dataflow-temp/{}/{}/amazon_sessions/amz_unlimited".format(
sch_date, ds_env
),
"staging_location": "gs://dataflow-staging/{}/{}/amazon_sessions/amz_unlimited".format(
sch_date, ds_env
),
"disk_size_gb": "100",
"num_workers": "10",
"num_max_workers": "25",
"worker_machine_type": "n1-highcpu-64",
"setup_file": os.path.join(
BEAM_SRC_DIR, "streaming_sessions", "setup.py"
),
"input": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/input/listens_*".format(
w_datalake,
),
"output": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/output/sessions_".format(
w_datalake
),
},
)
然而,我得到
'output': 'gs://datalake/amazon_sessions/{ ds_nodash[:4] }/{ ds_nodash[4:6] }/amz_prime/output/sessions_',
而不是
'output': 'gs://datalake/amazon_sessions/2022/02/amz_prime/output/sessions_',
我怎样才能做到这一点?
首先,您正在为 jinja 模板化字段使用 format
字符串。
format()
会将 {var}
替换为传递的参数值(如果存在)。
"gs://{}/.../{{ ds_nodash[:4] }}...".format(w_datalake)
首先 {}
替换为“datalake”,第二部分没有传递任何等效参数,因此导致文字“ds_nodash[:4]”。
"gs://datalake/.../{ds_nodash[:4]}..."
为了在格式化字符串中使用 jinja 模板,您可以转义 {
和 }
部分您打算从 jinja 获取价值。要转义 {
,您可以添加另一个 {
,而对于 }
,您可以添加另一个 }
。原来的有 2 {{
所以像这样在每一边加 2 {
;
"gs://{}/.../{{{{ ds_nodash[:4] }}}}...".format(w_datalake)
这样,先应用format
(替换值,去掉转义符),把这个字符串变成
gs://datalake/.../{{ ds_nodash[:4] }}...
然后这个字符串被传递给 BeamRunPythonPipelineOperator
,这部分用 jinja 字段转换。
其次,你可以使用execution_date
来随意格式化,而不是使用ds_nodash
两次切片
{{ execution_date.strftime('%Y/%m') }}
我目前正在将作业从 Airflow 1.10.14 迁移到 2.1.4
在 airflow2 中,我使用运算符 BeamRunPythonPipelineOperator
,其中一项要求是按照以下模式将数据存储在 GCS 中:gs://datalate/data_source/YYYY/MM/model.
partition_sessions_unlimited = BeamRunPythonPipelineOperator(
task_id="partition_sessions_unlimited",
dag=aggregation_dag,
py_file=os.path.join(
BEAM_SRC_DIR,
"streaming_sessions",
"streaming_session_aggregation_pipeline.py",
),
runner="DataflowRunner",
dataflow_config=DataflowConfiguration(
job_name="%s_partition_sessions_unlimited" % ds_env,
project_id=GCP_PROJECT_ID,
location="us-central1",
),
pipeline_options={
"temp_location": "gs://dataflow-temp/{}/{}/amazon_sessions/amz_unlimited".format(
sch_date, ds_env
),
"staging_location": "gs://dataflow-staging/{}/{}/amazon_sessions/amz_unlimited".format(
sch_date, ds_env
),
"disk_size_gb": "100",
"num_workers": "10",
"num_max_workers": "25",
"worker_machine_type": "n1-highcpu-64",
"setup_file": os.path.join(
BEAM_SRC_DIR, "streaming_sessions", "setup.py"
),
"input": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/input/listens_*".format(
w_datalake,
),
"output": "gs://{}/amazon_sessions/{{ ds_nodash[:4] }}/{{ ds_nodash[4:6] }}/amz_unlimited/output/sessions_".format(
w_datalake
),
},
)
然而,我得到
'output': 'gs://datalake/amazon_sessions/{ ds_nodash[:4] }/{ ds_nodash[4:6] }/amz_prime/output/sessions_',
而不是
'output': 'gs://datalake/amazon_sessions/2022/02/amz_prime/output/sessions_',
我怎样才能做到这一点?
首先,您正在为 jinja 模板化字段使用 format
字符串。
format()
会将 {var}
替换为传递的参数值(如果存在)。
"gs://{}/.../{{ ds_nodash[:4] }}...".format(w_datalake)
首先 {}
替换为“datalake”,第二部分没有传递任何等效参数,因此导致文字“ds_nodash[:4]”。
"gs://datalake/.../{ds_nodash[:4]}..."
为了在格式化字符串中使用 jinja 模板,您可以转义 {
和 }
部分您打算从 jinja 获取价值。要转义 {
,您可以添加另一个 {
,而对于 }
,您可以添加另一个 }
。原来的有 2 {{
所以像这样在每一边加 2 {
;
"gs://{}/.../{{{{ ds_nodash[:4] }}}}...".format(w_datalake)
这样,先应用format
(替换值,去掉转义符),把这个字符串变成
gs://datalake/.../{{ ds_nodash[:4] }}...
然后这个字符串被传递给 BeamRunPythonPipelineOperator
,这部分用 jinja 字段转换。
其次,你可以使用execution_date
来随意格式化,而不是使用ds_nodash
两次切片
{{ execution_date.strftime('%Y/%m') }}