Apache Beam / Google Cloud Dataflow 大查询 reader 从第二个 运行 开始失败
Apache Beam / Google Cloud Dataflow big-query reader failing from second run
我们有一个使用 Apache Beam 构建的数据流,并部署在 GCP 数据流基础设施中。数据流实例 运行 第一次完美,并按预期创建分区 table,但是当它 运行 第二次开始时,它会从数据集中清除结果,而不是用新数据集替换具体分区。当它 运行 使用本地设置中的 Direct 运行ner 时,作业完美运行。
代码示例:
pipeline.apply(
"Read from BigQuery (table_name) Table: ",
BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",
FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
.usingStandardSql()
.withoutValidation()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",
MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
如果知道我在这里缺少什么,请告诉我。提前致谢!
想通了!
这是我在模板化环境中所做的更改:
"Read from BigQuery (table_name) Table: ",
BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",
FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",
MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
.withTemplateCompatibility()
请查看更多文档here
我们有一个使用 Apache Beam 构建的数据流,并部署在 GCP 数据流基础设施中。数据流实例 运行 第一次完美,并按预期创建分区 table,但是当它 运行 第二次开始时,它会从数据集中清除结果,而不是用新数据集替换具体分区。当它 运行 使用本地设置中的 Direct 运行ner 时,作业完美运行。
代码示例:
pipeline.apply(
"Read from BigQuery (table_name) Table: ",
BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",
FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
.usingStandardSql()
.withoutValidation()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",
MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
如果知道我在这里缺少什么,请告诉我。提前致谢!
想通了!
这是我在模板化环境中所做的更改:
"Read from BigQuery (table_name) Table: ",
BigQueryIO.readTableRows()
.fromQuery(
String.format(
"SELECT %s FROM `%s.%s.%s`",
FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
.usingStandardSql()
.withoutValidation()
.withTemplateCompatibility()));
PCollection<VideoPlacement.Placement> rows =
tableRow.apply(
"TableRows to BigQueryVideoPlacement.Placement",
MapElements.into(TypeDescriptor.of(Model.class))
.via(Model::fromTableRow));
.withTemplateCompatibility()
请查看更多文档here