如何 运行 BigQueryIO.read().fromQuery 带参数
How to run BigQueryIO.read().fromQuery with parameters
我需要 运行 来自单个 .SQL 文件的多个查询,但参数不同
我已经尝试过类似的方法,但它不起作用,因为 BigQueryIO.Read 仅消耗 PBegin。
public PCollection<KV<String, TestDitoDto>> expand(PCollection<QueryParamsBatch> input) {
PCollection<KV<String, Section1Dto>> section1 = input.apply("Read Section1 from BQ",
BigQueryIO
.readTableRows()
.fromQuery(ResourceRetriever.getResourceFile("query/test/section1.sql"))
.usingStandardSql()
.withoutValidation())
.apply("Convert section1 to Dto", ParDo.of(new TableRowToSection1DtoFunction()));
}
是否有任何其他方法可以将现有 PCollection 中的参数放入我的 BigQueryIO.read() 调用中?
管道建设时间是否不同queries/parameters?如果是这样,您可以创建多个读取转换并合并结果,例如,使用 Flatten 转换。
Beam Java BigQuery 源当前不支持读取 PCollection
个查询。 Python BQ source 虽然如此。
我提出了以下解决方案:不使用 BigQueryIO,而是使用常规 GCP 库来访问 BigQuery,将其标记为瞬态并在每次使用 @Setup 注释的方法中对其进行初始化,因为它不是可序列化的
public class DenormalizedCase1Fn extends DoFn<*> {
private transient BigQuery bigQuery;
@Setup
public void initialize() {
this.bigQuery = BigQueryOptions.newBuilder()
.setProjectId(bqProjectId.get())
.setLocation(LOCATION)
.setRetrySettings(RetrySettings.newBuilder()
.setRpcTimeoutMultiplier(1.5)
.setInitialRpcTimeout(Duration.ofSeconds(5))
.setMaxRpcTimeout(Duration.ofSeconds(30))
.setMaxAttempts(3).build())
.build().getService();
}
@ProcessElement
...
我需要 运行 来自单个 .SQL 文件的多个查询,但参数不同 我已经尝试过类似的方法,但它不起作用,因为 BigQueryIO.Read 仅消耗 PBegin。
public PCollection<KV<String, TestDitoDto>> expand(PCollection<QueryParamsBatch> input) {
PCollection<KV<String, Section1Dto>> section1 = input.apply("Read Section1 from BQ",
BigQueryIO
.readTableRows()
.fromQuery(ResourceRetriever.getResourceFile("query/test/section1.sql"))
.usingStandardSql()
.withoutValidation())
.apply("Convert section1 to Dto", ParDo.of(new TableRowToSection1DtoFunction()));
}
是否有任何其他方法可以将现有 PCollection 中的参数放入我的 BigQueryIO.read() 调用中?
管道建设时间是否不同queries/parameters?如果是这样,您可以创建多个读取转换并合并结果,例如,使用 Flatten 转换。
Beam Java BigQuery 源当前不支持读取 PCollection
个查询。 Python BQ source 虽然如此。
我提出了以下解决方案:不使用 BigQueryIO,而是使用常规 GCP 库来访问 BigQuery,将其标记为瞬态并在每次使用 @Setup 注释的方法中对其进行初始化,因为它不是可序列化的
public class DenormalizedCase1Fn extends DoFn<*> {
private transient BigQuery bigQuery;
@Setup
public void initialize() {
this.bigQuery = BigQueryOptions.newBuilder()
.setProjectId(bqProjectId.get())
.setLocation(LOCATION)
.setRetrySettings(RetrySettings.newBuilder()
.setRpcTimeoutMultiplier(1.5)
.setInitialRpcTimeout(Duration.ofSeconds(5))
.setMaxRpcTimeout(Duration.ofSeconds(30))
.setMaxAttempts(3).build())
.build().getService();
}
@ProcessElement
...