未使用 webFlux 和 netty 调用 ReactiveCosmosRepository

ReactiveCosmosRepository is not being invoked with webFlux and netty

我有以下响应式存储库:

@Repository
public interface FooCosmosRepository extends ReactiveCosmosRepository<Foo, String> {
}

我正在使用它如下:

    @Override
    public Mono<FooResponse> getFooDetails() {

        FooResponse fooResponse = new FooResponse();
        fooResponse.setCount(1000);

        List<Foo> fooList = new ArrayList<>();

        repository.findAll().collectList().flatMap(e ->{

       //This is not invoked. findAll return Flux<T> in this case Flux<Foo>
            for (Foo foo : e) {
                fooList.add(foo);
            }
            return null;
        });

        fooResponse.setFooList(fooList);

        return Mono.just(fooResponse);
    }

FooResponse定义如下:

@NoArgsConstructor
@Data
@FieldDefaults(level = AccessLevel.PRIVATE)
public class FooResponse {
    int rowCount;
    List<Foo> fooList;
}

我无法阻止,因为我收到错误消息 Iterating over a toIterable() / toStream() is blocking, which is not supported in thread reactor-http-nio-6

我也不能 return Flux<T> 从方法。我需要 return Mono<FooResponse>。我如何查询存储库,实际上 get/collect 响应并添加到列表?

有什么想法吗?

这是因为您是命令式而不是反应式编码。而你正在打破链条,这意味着 Reactor 无法完成组装阶段,然后在订阅阶段执行。

@Override
public Mono<FooResponse> getFooDetails() {
    return repository.findAll()
        .collectList()
        .map(list -> {
            FooResponse fooResponse = new FooResponse();
            fooResponse.setCount(1000);
            fooResponse.setList(list);
            return fooResponse;
        });
    }

这是基本的反应器,我推荐以下链接:

Reactor Core Features

Flight of the flux