如何将参数从 google composer 传递到数据流模板
How to pass parameters from google composer to dataflow template
我正在尝试按照以下方式将来自 google composer 的参数传递到数据流模板中,但它不起作用。
# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters={
"input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
}
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)
你可以像下面这样通过
DataflowTemplateOperator(,
task_id="task1",
template=get_variable_value("template"),
on_failure_callback=update_job_message,
parameters={
"fileBucket": get_variable_value("file_bucket"),
"basePath": get_variable_value("path_input"),
"Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
},
)
我们正在使用 Java 并且在 Dataflow 作业中我们有选项 class get 和 set 如下
public interface MyOptions extends CommonOptions {
@Description("The output bucket")
@Validation.Required
ValueProvider<String> getFileBucket();
void setFileBucket(ValueProvider<String> value);
}
我们需要为此数据流作业创建模板,该模板将由 composer dag 触发。
从 Dataflow Classic 模板迁移到 Flex 模板解决了这个问题。
我正在尝试按照以下方式将来自 google composer 的参数传递到数据流模板中,但它不起作用。
# composer code
trigger_dataflow = DataflowTemplateOperator(
task_id="trigger_dataflow",
template="gs://mybucket/my_template",
dag=dag,
job_name='appsflyer_events_daily',
parameters={
"input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
}
)
# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_value_provider_argument(
'--input',
default='gs://my_bucket/*.gz',
help='path of input file')
def main():
pipeline_options = PipelineOptions()
user_options = pipeline_options.view_as(UserOptions)
p = beam.Pipeline(options=pipeline_options)
lines = (
p
| MatchFiles(user_options.input)
)
你可以像下面这样通过
DataflowTemplateOperator(,
task_id="task1",
template=get_variable_value("template"),
on_failure_callback=update_job_message,
parameters={
"fileBucket": get_variable_value("file_bucket"),
"basePath": get_variable_value("path_input"),
"Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
},
)
我们正在使用 Java 并且在 Dataflow 作业中我们有选项 class get 和 set 如下
public interface MyOptions extends CommonOptions {
@Description("The output bucket")
@Validation.Required
ValueProvider<String> getFileBucket();
void setFileBucket(ValueProvider<String> value);
}
我们需要为此数据流作业创建模板,该模板将由 composer dag 触发。
从 Dataflow Classic 模板迁移到 Flex 模板解决了这个问题。