使用 spring-data-mongodb 流式传输聚合操作的结果
Streaming the result of an aggregate operation using spring-data-mongodb
我正在使用 spring-data-mongodb 并且我想使用游标进行聚合操作。
MongoTemplate.stream() 获取查询,因此我尝试创建聚合实例,将其转换为 DbObject 使用Aggregation.toDbObject(),使用 DbObject 创建了一个 BasicQuery,然后调用 stream() 方法。
这 returns 一个空游标。
调试 spring-data-mongodb 代码显示 MongoTemplate.stream() 使用 FindOperation,这让我觉得spring-data-mongodb不支持流聚合操作。
有没有人能够使用 spring-data-mongodb 流式传输聚合查询的结果?
郑重声明,我可以使用 Java mongodb 驱动程序,但我更喜欢使用 spring-data.
编辑 11 月 10 日 - 添加示例代码:
MatchOperation match = Aggregation.match(Criteria.where("type").ne("AType"));
GroupOperation group = Aggregation.group("name", "type");
group = group.push("color").as("colors");
group = group.push("size").as("sizes");
TypedAggregation<MyClass> agg = Aggregation.newAggregation(MyClass.class, Arrays.asList(match, group));
MongoConverter converter = mongoTemplate.getConverter();
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext = converter.getMappingContext();
QueryMapper queryMapper = new QueryMapper(converter);
AggregationOperationContext context = new TypeBasedAggregationOperationContext(MyClass.class, mappingContext, queryMapper);
// create a BasicQuery to be used in the stream() method by converting the Aggregation to a DbObject
BasicQuery query = new BasicQuery(agg.toDbObject("myClass", context));
// spring-mongo attributes the stream() method to find() operationsm not to aggregate() operations so the stream returns an empty cursor
CloseableIterator<MyClass> iter = mongoTemplate.stream(query, MyClass.class);
// this is an empty cursor
while(iter.hasNext()) {
System.out.println(iter.next().getName());
}
以下代码,未使用stream()方法,returns聚合的预期非空结果:
AggregationResults<HashMap> result = mongoTemplate.aggregate(agg, "myClass", HashMap.class);
对于那些仍在努力寻找答案的人:
从 spring-data-mongo 版本 2.0.0.M4 开始 (AFAIK) MongoTemplate
得到了 aggregateStream
方法。
因此您可以执行以下操作:
AggregationOptions aggregationOptions = Aggregation.newAggregationOptions()
// this is very important: if you do not set the batch size, you'll get all the objects at once and you might run out of memory if the returning data set is too large
.cursorBatchSize(mongoCursorBatchSize)
.build();
data = mongoTemplate.aggregateStream(Aggregation.newAggregation(
Aggregation.group("person_id").count().as("count")).withOptions(aggregationOptions), collectionName, YourClazz.class);
我正在使用 spring-data-mongodb 并且我想使用游标进行聚合操作。
MongoTemplate.stream() 获取查询,因此我尝试创建聚合实例,将其转换为 DbObject 使用Aggregation.toDbObject(),使用 DbObject 创建了一个 BasicQuery,然后调用 stream() 方法。
这 returns 一个空游标。
调试 spring-data-mongodb 代码显示 MongoTemplate.stream() 使用 FindOperation,这让我觉得spring-data-mongodb不支持流聚合操作。
有没有人能够使用 spring-data-mongodb 流式传输聚合查询的结果?
郑重声明,我可以使用 Java mongodb 驱动程序,但我更喜欢使用 spring-data.
编辑 11 月 10 日 - 添加示例代码:
MatchOperation match = Aggregation.match(Criteria.where("type").ne("AType"));
GroupOperation group = Aggregation.group("name", "type");
group = group.push("color").as("colors");
group = group.push("size").as("sizes");
TypedAggregation<MyClass> agg = Aggregation.newAggregation(MyClass.class, Arrays.asList(match, group));
MongoConverter converter = mongoTemplate.getConverter();
MappingContext<? extends MongoPersistentEntity<?>, MongoPersistentProperty> mappingContext = converter.getMappingContext();
QueryMapper queryMapper = new QueryMapper(converter);
AggregationOperationContext context = new TypeBasedAggregationOperationContext(MyClass.class, mappingContext, queryMapper);
// create a BasicQuery to be used in the stream() method by converting the Aggregation to a DbObject
BasicQuery query = new BasicQuery(agg.toDbObject("myClass", context));
// spring-mongo attributes the stream() method to find() operationsm not to aggregate() operations so the stream returns an empty cursor
CloseableIterator<MyClass> iter = mongoTemplate.stream(query, MyClass.class);
// this is an empty cursor
while(iter.hasNext()) {
System.out.println(iter.next().getName());
}
以下代码,未使用stream()方法,returns聚合的预期非空结果:
AggregationResults<HashMap> result = mongoTemplate.aggregate(agg, "myClass", HashMap.class);
对于那些仍在努力寻找答案的人:
从 spring-data-mongo 版本 2.0.0.M4 开始 (AFAIK) MongoTemplate
得到了 aggregateStream
方法。
因此您可以执行以下操作:
AggregationOptions aggregationOptions = Aggregation.newAggregationOptions()
// this is very important: if you do not set the batch size, you'll get all the objects at once and you might run out of memory if the returning data set is too large
.cursorBatchSize(mongoCursorBatchSize)
.build();
data = mongoTemplate.aggregateStream(Aggregation.newAggregation(
Aggregation.group("person_id").count().as("count")).withOptions(aggregationOptions), collectionName, YourClazz.class);