按顺序执行读取操作 - Apache Beam

Execute read operations in sequence - Apache Beam

我需要按照给定的顺序执行以下操作:-

 PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile())) 

      .apply("Get fileName",ParDo.of(new DoFn<String,String>(){
          ValueProvider<String> fileReceived = options.getfilename();
          @ProcessElement
          public void procesElement(ProcessContext c)
          {
              fileName = fileReceived.get().toString();
              LOG.info("File: "+fileName);
          }
      }));

      PCollection<TableRow> rows = p.apply("Read from BigQuery",
              BigQueryIO.read()
                  .fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
              .usingStandardSql());

如何在 Apache 中完成此操作 Beam/Dataflow?

您似乎想将 BigQueryIO.read().fromQuery() 应用到一个查询,该查询依赖于通过 PipelineOptionsValueProvider<String> 类型的 属性 可用的值,并且在管道构建时无法访问提供程序 - 即您正在通过模板调用您的工作。

在这种情况下,正确的解决方案是使用 NestedValueProvider:

PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
    NestedValueProvider.of(
      options.getfilename(),
      new SerializableFunction<String, String>() {
        @Override
        public String apply(String filename) {
          return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
        }
      })));