嵌套连接导致错误 400 Bad Request
Nested Joins result in error 400 Bad Request
执行多个嵌套联接时,我在使用数据流服务时收到错误 400 错误请求。使用本地管道运行器工作正常。下面是我想要实现的一些示例代码:
PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(pipelineOptions);
Datastore datastore = getDatastore(pipelineOptions, DATASET_ID);
addData(datastore);
PCollection<KV<Long, DatastoreV1.Entity>> users = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity1")))
.apply(ParDo.of(new MakeKVFromParent()));
PCollection<KV<Long, DatastoreV1.Entity>> locations = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity2")))
.apply(ParDo.of(new MakeKVFromParent()));
PCollection<KV<Long, DatastoreV1.Entity>> cars = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity3")))
.apply(ParDo.of(new MakeKVFromParent()));
TupleTag<DatastoreV1.Entity> carsTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> groupedCars = KeyedPCollectionTuple.of(carsTag, cars)
.apply(CoGroupByKey.<Long>create());
TupleTag<CoGbkResult> groupedCarsTag = new TupleTag<CoGbkResult>();
TupleTag<DatastoreV1.Entity> locationsTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> locationData = KeyedPCollectionTuple.of(groupedCarsTag, groupedCars)
.and(locationsTag, locations)
.apply(CoGroupByKey.<Long>create());
//Comment this block of code to remove the bug.
TupleTag<CoGbkResult> locationDataTag = new TupleTag<CoGbkResult>();
TupleTag<DatastoreV1.Entity> usersTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> userData = KeyedPCollectionTuple.of(locationDataTag, locationData)
.and(usersTag, users)
.apply(CoGroupByKey.<Long>create());
//Do some computation on userData
pipeline.run();
基本上我有一堆用户。一个用户可以有多个位置和汽车。汽车始终附加到特定位置和用户。我想按用户和位置对汽车进行分组,以便我知道每个用户拥有的位置以及他在每个位置拥有的汽车。我为每个用户对这些数据做了一些计算。
可以找到一个演示我的问题的工作示例 here。
提交作业时出现错误。可以找到提交的作业文件here
删除最后一个连接后,作业运行正常。有谁知道我做错了什么吗?
感谢您报告此事,也感谢您提供出色的示例代码!我们已找到服务中的一个问题并正在努力修复它。在我们努力解决此问题的同时,您可以通过不重新使用 CoGroupByKeyResult
作为 CoGroupByKey
.
的输入来避免此问题
具体来说,在这种情况下,执行以下操作将减少 CoGroupByKey
操作的次数,使您更容易获取数据,并且还可以避免使用 CoGroupByKeyResult
作为 [= 的输入13=]:
TupleTag<DatastoreV1.Entity> carsTag = new TupleTag<DatastoreV1.Entity>();
TupleTag<DatastoreV1.Entity> locationsTag = new TupleTag<DatastoreV1.Entity>();
TupleTag<DatastoreV1.Entity> usersTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> usersCars = KeyedPCollectionTuple
.of(carsTag, cars)
.and(locationsTag, locations)
.and(usersTag, users)
.apply(CoGroupByKey.<Long>create());
有了上面的内容,现在也可以更轻松地访问 CoGbkResult
的部分内容。例如:
// Before (with nested CoGroupByKey)
originalResult.getOnly(locationDataTag).getOnly(groupedCarsTag).getAll(usersTag);
// After (with a single CoGroupByKey)
newResult.getAll(usersTag);
执行多个嵌套联接时,我在使用数据流服务时收到错误 400 错误请求。使用本地管道运行器工作正常。下面是我想要实现的一些示例代码:
PipelineOptions pipelineOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(PipelineOptions.class);
Pipeline pipeline = Pipeline.create(pipelineOptions);
Datastore datastore = getDatastore(pipelineOptions, DATASET_ID);
addData(datastore);
PCollection<KV<Long, DatastoreV1.Entity>> users = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity1")))
.apply(ParDo.of(new MakeKVFromParent()));
PCollection<KV<Long, DatastoreV1.Entity>> locations = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity2")))
.apply(ParDo.of(new MakeKVFromParent()));
PCollection<KV<Long, DatastoreV1.Entity>> cars = pipeline.apply(DatastoreIO.readFrom(DATASET_ID, makeQueryForKind("Entity3")))
.apply(ParDo.of(new MakeKVFromParent()));
TupleTag<DatastoreV1.Entity> carsTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> groupedCars = KeyedPCollectionTuple.of(carsTag, cars)
.apply(CoGroupByKey.<Long>create());
TupleTag<CoGbkResult> groupedCarsTag = new TupleTag<CoGbkResult>();
TupleTag<DatastoreV1.Entity> locationsTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> locationData = KeyedPCollectionTuple.of(groupedCarsTag, groupedCars)
.and(locationsTag, locations)
.apply(CoGroupByKey.<Long>create());
//Comment this block of code to remove the bug.
TupleTag<CoGbkResult> locationDataTag = new TupleTag<CoGbkResult>();
TupleTag<DatastoreV1.Entity> usersTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> userData = KeyedPCollectionTuple.of(locationDataTag, locationData)
.and(usersTag, users)
.apply(CoGroupByKey.<Long>create());
//Do some computation on userData
pipeline.run();
基本上我有一堆用户。一个用户可以有多个位置和汽车。汽车始终附加到特定位置和用户。我想按用户和位置对汽车进行分组,以便我知道每个用户拥有的位置以及他在每个位置拥有的汽车。我为每个用户对这些数据做了一些计算。
可以找到一个演示我的问题的工作示例 here。
提交作业时出现错误。可以找到提交的作业文件here
删除最后一个连接后,作业运行正常。有谁知道我做错了什么吗?
感谢您报告此事,也感谢您提供出色的示例代码!我们已找到服务中的一个问题并正在努力修复它。在我们努力解决此问题的同时,您可以通过不重新使用 CoGroupByKeyResult
作为 CoGroupByKey
.
具体来说,在这种情况下,执行以下操作将减少 CoGroupByKey
操作的次数,使您更容易获取数据,并且还可以避免使用 CoGroupByKeyResult
作为 [= 的输入13=]:
TupleTag<DatastoreV1.Entity> carsTag = new TupleTag<DatastoreV1.Entity>();
TupleTag<DatastoreV1.Entity> locationsTag = new TupleTag<DatastoreV1.Entity>();
TupleTag<DatastoreV1.Entity> usersTag = new TupleTag<DatastoreV1.Entity>();
PCollection<KV<Long, CoGbkResult>> usersCars = KeyedPCollectionTuple
.of(carsTag, cars)
.and(locationsTag, locations)
.and(usersTag, users)
.apply(CoGroupByKey.<Long>create());
有了上面的内容,现在也可以更轻松地访问 CoGbkResult
的部分内容。例如:
// Before (with nested CoGroupByKey)
originalResult.getOnly(locationDataTag).getOnly(groupedCarsTag).getAll(usersTag);
// After (with a single CoGroupByKey)
newResult.getAll(usersTag);