如何操作来自 Flux<Object> 的对象,其值来自以非阻塞方式发出 Mono<Items> 的方法?

How to manipulate object coming from a Flux<Object> with a value coming from a method emitting Mono<Items> in non-blocking way?

我正在尝试使用从 Mono 接收的数据来操作从 Flux 接收的对象,其中发出对象的 Flux 和项目的 Mono 的方法都是不同的 API 调用。问题是,我无法控制线程,并且从 Mono 接收的项目永远不会分配给我的对象,除非我故意 block() 该线程。请建议这种情况下是否有任何可能的非阻塞方式。

我也查看了 Schedulers、subscribeOn、publishOn,但无法找出管道。

public Flux<Object> test {

 method1().map(obj -> {
        if (obj.getTotalItems() > 20) {
            obj.setItems(method2(obj).block());
        }
        return obj;
  });
}

此处 method1 发出从 API 命中接收到的对象的 Flux。

并且方法 2 正在发出从另一个 API 命中中获取的项目列表。

如何使整个流程无阻塞?

尝试flatMapconcatMap

使用 flatMap 运算符,您可以非阻塞地展平子流 public

Flux<Object> test {

 method1().flatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

flatMap 允许您一次展平多个流,因此在长运行 操作的情况下,您可以使用更高效的流程元素。

flatMap 的一个缺点是它不保留元素的顺序,因此如果您有一系列上游元素,例如 [1, 2, 3, 4]flatMap,则顺序可能会更改由于子流的异步性质。

为了保持顺序,您可以使用 concatMap 一次只展平一次流,因此可以保证展平元素的顺序将被保留:

Flux<Object> test {

 method1().concatMap(obj -> {
        if (obj.getTotalItems() > 20) {
            return method2(obj)
                     .map(result -> {
                        obj.setItems(result);
                        return obj;
                     });
        }
        return Mono.just(obj);
  });
}

备注

以这种方式改变对象不是最好的主意,我更愿意在反应式中使用不可变对象模式对象编程