Reactor 框架与组装时间和订阅时间混淆(何时调用订阅)

Reactor framework confusion with Assembly time and subscription time (when to call subscribe)

其实我对组装时间和订阅时间很困惑。我知道 mono 是懒惰的,在订阅之前不会执行。下面是一个方法。

    public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
        GroupRequest request = ImmutableGroupRequest
                .builder()
                .cloudId(id)
                .build();

        Mono<List<GroupResponse>> response = ussClient.getGroups(request);
        List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();


        response.flatMapIterable(elem -> elem)
                .toIterable().iterator().forEachRemaining(
                    groupResponse -> {
                        groupResponse.getResources().iterator().forEachRemaining(
                            resource -> {
                                groups.add(ImmutableGroupPrincipal
                                        .builder()
                                        .groupId(resource.getId())
                                        .name(resource.getDisplayName())
                                        .addAllUsers(convertMemebersToUsers(resource))
                                        .build());
                            }
                        );
                    }
                );
        log.debug("Response Object - " + groups.toString());
        ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
                .builder()
                .userbaseId(id)
                .addAllGroups(groups)
                .build();


        Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

        return Mono.just(res);
    }

这会在不调用订阅的情况下执行 Mono<List<GroupResponse>> response = ussClient.getGroups(request);,但是除非我在上面调用订阅,否则下面不会执行。

Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
        .parallel()
                .runOn(Schedulers.parallel())
                .doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
                .sequential();

我可以就组装时间和订阅获得更多意见吗?

“在您订阅之前什么都不会发生”并非在所有情况下都完全正确。发布者(MonoFlux)将在三种情况下执行:

  • 您订阅了;
  • 你屏蔽了;
  • 发布者很“火爆”。

请注意,上述场景都适用于整个反应链——即如果我订阅了一个发布者,上游的所有内容(依赖于该发布者)也会执行。这就是为什么框架可以并且应该在需要时调用订阅,从而导致控制器中定义的反应链执行。

在你的情况下,它实际上是其中的第二个——你在阻塞,这本质上是一个“订阅并等待结果”。通常阻塞的方法都有清楚的标记,但总是情况并非如此-在您的情况下,是Flux上的toIterable()方法执行阻塞:

Transform this Flux into a lazy Iterable blocking on Iterator.next() calls.

但是啊,你说,我不打电话 Iterator.next() - 怎么了?!

嗯,隐含地你是通过调用 forEachRemaining():

The default implementation behaves as if:

 while (hasNext())
     action.accept(next());

...并且根据上述规则,由于 ussClient.getGroups(request) 是此阻塞调用的上游,因此它会被执行。