如何将参数从 google composer 传递到数据流模板

How to pass parameters from google composer to dataflow template

我正在尝试按照以下方式将来自 google composer 的参数传递到数据流模板中,但它不起作用。

# composer code
trigger_dataflow = DataflowTemplateOperator(
     task_id="trigger_dataflow",
     template="gs://mybucket/my_template",
     dag=dag,
     job_name='appsflyer_events_daily',
     parameters={
         "input": f'gs://my_bucket/' + "{{ ds }}" + "/*.gz"
     }
)

# template code
class UserOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
    parser.add_value_provider_argument(
        '--input',
        default='gs://my_bucket/*.gz',
        help='path of input file')

def main():
    pipeline_options = PipelineOptions()
    user_options = pipeline_options.view_as(UserOptions) 
    p = beam.Pipeline(options=pipeline_options)
    lines = (
        p
        | MatchFiles(user_options.input)
    )

你可以像下面这样通过

DataflowTemplateOperator(,
         task_id="task1",
         template=get_variable_value("template"),
         on_failure_callback=update_job_message,
         parameters={
             "fileBucket": get_variable_value("file_bucket"),
             "basePath": get_variable_value("path_input"),         
             "Day": "{{ json.loads(ti.xcom_pull(key=run_id))['day'] }}",
         },
    )

我们正在使用 Java 并且在 Dataflow 作业中我们有选项 class get 和 set 如下

public interface MyOptions extends CommonOptions {
 
    @Description("The output bucket")
    @Validation.Required
    ValueProvider<String> getFileBucket();
    void setFileBucket(ValueProvider<String> value);
    
}

我们需要为此数据流作业创建模板,该模板将由 composer dag 触发。

从 Dataflow Classic 模板迁移到 Flex 模板解决了这个问题。