按顺序执行读取操作 - Apache Beam
Execute read operations in sequence - Apache Beam
我需要按照给定的顺序执行以下操作:-
PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile()))
.apply("Get fileName",ParDo.of(new DoFn<String,String>(){
ValueProvider<String> fileReceived = options.getfilename();
@ProcessElement
public void procesElement(ProcessContext c)
{
fileName = fileReceived.get().toString();
LOG.info("File: "+fileName);
}
}));
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read()
.fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
.usingStandardSql());
如何在 Apache 中完成此操作 Beam/Dataflow?
您似乎想将 BigQueryIO.read().fromQuery()
应用到一个查询,该查询依赖于通过 PipelineOptions
中 ValueProvider<String>
类型的 属性 可用的值,并且在管道构建时无法访问提供程序 - 即您正在通过模板调用您的工作。
在这种情况下,正确的解决方案是使用 NestedValueProvider
:
PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
}
})));
我需要按照给定的顺序执行以下操作:-
PCollection<String> read = p.apply("Read Lines",TextIO.read().from(options.getInputFile()))
.apply("Get fileName",ParDo.of(new DoFn<String,String>(){
ValueProvider<String> fileReceived = options.getfilename();
@ProcessElement
public void procesElement(ProcessContext c)
{
fileName = fileReceived.get().toString();
LOG.info("File: "+fileName);
}
}));
PCollection<TableRow> rows = p.apply("Read from BigQuery",
BigQueryIO.read()
.fromQuery("SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'")
.usingStandardSql());
如何在 Apache 中完成此操作 Beam/Dataflow?
您似乎想将 BigQueryIO.read().fromQuery()
应用到一个查询,该查询依赖于通过 PipelineOptions
中 ValueProvider<String>
类型的 属性 可用的值,并且在管道构建时无法访问提供程序 - 即您正在通过模板调用您的工作。
在这种情况下,正确的解决方案是使用 NestedValueProvider
:
PCollection<TableRow> tableRows = p.apply(BigQueryIO.read().fromQuery(
NestedValueProvider.of(
options.getfilename(),
new SerializableFunction<String, String>() {
@Override
public String apply(String filename) {
return "SELECT table,schema FROM `DatasetID.TableID` WHERE file='" + fileName +"'";
}
})));