未使用 webFlux 和 netty 调用 ReactiveCosmosRepository
ReactiveCosmosRepository is not being invoked with webFlux and netty
我有以下响应式存储库:
@Repository
public interface FooCosmosRepository extends ReactiveCosmosRepository<Foo, String> {
}
我正在使用它如下:
@Override
public Mono<FooResponse> getFooDetails() {
FooResponse fooResponse = new FooResponse();
fooResponse.setCount(1000);
List<Foo> fooList = new ArrayList<>();
repository.findAll().collectList().flatMap(e ->{
//This is not invoked. findAll return Flux<T> in this case Flux<Foo>
for (Foo foo : e) {
fooList.add(foo);
}
return null;
});
fooResponse.setFooList(fooList);
return Mono.just(fooResponse);
}
FooResponse定义如下:
@NoArgsConstructor
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
public class FooResponse {
int rowCount;
List<Foo> fooList;
}
我无法阻止,因为我收到错误消息
Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-6
我也不能 return Flux<T>
从方法。我需要 return Mono<FooResponse>
。我如何查询存储库,实际上 get/collect 响应并添加到列表?
有什么想法吗?
这是因为您是命令式而不是反应式编码。而你正在打破链条,这意味着 Reactor 无法完成组装阶段,然后在订阅阶段执行。
@Override
public Mono<FooResponse> getFooDetails() {
return repository.findAll()
.collectList()
.map(list -> {
FooResponse fooResponse = new FooResponse();
fooResponse.setCount(1000);
fooResponse.setList(list);
return fooResponse;
});
}
这是基本的反应器,我推荐以下链接:
我有以下响应式存储库:
@Repository
public interface FooCosmosRepository extends ReactiveCosmosRepository<Foo, String> {
}
我正在使用它如下:
@Override
public Mono<FooResponse> getFooDetails() {
FooResponse fooResponse = new FooResponse();
fooResponse.setCount(1000);
List<Foo> fooList = new ArrayList<>();
repository.findAll().collectList().flatMap(e ->{
//This is not invoked. findAll return Flux<T> in this case Flux<Foo>
for (Foo foo : e) {
fooList.add(foo);
}
return null;
});
fooResponse.setFooList(fooList);
return Mono.just(fooResponse);
}
FooResponse定义如下:
@NoArgsConstructor
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
public class FooResponse {
int rowCount;
List<Foo> fooList;
}
我无法阻止,因为我收到错误消息
Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-6
我也不能 return Flux<T>
从方法。我需要 return Mono<FooResponse>
。我如何查询存储库,实际上 get/collect 响应并添加到列表?
有什么想法吗?
这是因为您是命令式而不是反应式编码。而你正在打破链条,这意味着 Reactor 无法完成组装阶段,然后在订阅阶段执行。
@Override
public Mono<FooResponse> getFooDetails() {
return repository.findAll()
.collectList()
.map(list -> {
FooResponse fooResponse = new FooResponse();
fooResponse.setCount(1000);
fooResponse.setList(list);
return fooResponse;
});
}
这是基本的反应器,我推荐以下链接: