Reactor 框架与组装时间和订阅时间混淆(何时调用订阅)
Reactor framework confusion with Assembly time and subscription time (when to call subscribe)
其实我对组装时间和订阅时间很困惑。我知道 mono 是懒惰的,在订阅之前不会执行。下面是一个方法。
public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
GroupRequest request = ImmutableGroupRequest
.builder()
.cloudId(id)
.build();
Mono<List<GroupResponse>> response = ussClient.getGroups(request);
List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();
response.flatMapIterable(elem -> elem)
.toIterable().iterator().forEachRemaining(
groupResponse -> {
groupResponse.getResources().iterator().forEachRemaining(
resource -> {
groups.add(ImmutableGroupPrincipal
.builder()
.groupId(resource.getId())
.name(resource.getDisplayName())
.addAllUsers(convertMemebersToUsers(resource))
.build());
}
);
}
);
log.debug("Response Object - " + groups.toString());
ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
.builder()
.userbaseId(id)
.addAllGroups(groups)
.build();
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
return Mono.just(res);
}
这会在不调用订阅的情况下执行 Mono<List<GroupResponse>> response = ussClient.getGroups(request);
,但是除非我在上面调用订阅,否则下面不会执行。
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
我可以就组装时间和订阅获得更多意见吗?
“在您订阅之前什么都不会发生”并非在所有情况下都完全正确。发布者(Mono
或 Flux
)将在三种情况下执行:
- 您订阅了;
- 你屏蔽了;
- 发布者很“火爆”。
请注意,上述场景都适用于整个反应链——即如果我订阅了一个发布者,上游的所有内容(依赖于该发布者)也会执行。这就是为什么框架可以并且应该在需要时调用订阅,从而导致控制器中定义的反应链执行。
在你的情况下,它实际上是其中的第二个——你在阻塞,这本质上是一个“订阅并等待结果”。通常阻塞的方法都有清楚的标记,但总是情况并非如此-在您的情况下,是Flux
上的toIterable()
方法执行阻塞:
Transform this Flux into a lazy Iterable blocking on Iterator.next()
calls.
但是啊,你说,我不打电话 Iterator.next()
- 怎么了?!
嗯,隐含地你是通过调用 forEachRemaining()
:
The default implementation behaves as if:
while (hasNext())
action.accept(next());
...并且根据上述规则,由于 ussClient.getGroups(request)
是此阻塞调用的上游,因此它会被执行。
其实我对组装时间和订阅时间很困惑。我知道 mono 是懒惰的,在订阅之前不会执行。下面是一个方法。
public Mono<UserbaseEntityResponse> getUserbaseDetailsForEntityId(String id) {
GroupRequest request = ImmutableGroupRequest
.builder()
.cloudId(id)
.build();
Mono<List<GroupResponse>> response = ussClient.getGroups(request);
List<UserbaseEntityResponse.GroupPrincipal> groups = new CopyOnWriteArrayList<>();
response.flatMapIterable(elem -> elem)
.toIterable().iterator().forEachRemaining(
groupResponse -> {
groupResponse.getResources().iterator().forEachRemaining(
resource -> {
groups.add(ImmutableGroupPrincipal
.builder()
.groupId(resource.getId())
.name(resource.getDisplayName())
.addAllUsers(convertMemebersToUsers(resource))
.build());
}
);
}
);
log.debug("Response Object - " + groups.toString());
ImmutableUserbaseEntityResponse res = ImmutableUserbaseEntityResponse
.builder()
.userbaseId(id)
.addAllGroups(groups)
.build();
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
return Mono.just(res);
}
这会在不调用订阅的情况下执行 Mono<List<GroupResponse>> response = ussClient.getGroups(request);
,但是除非我在上面调用订阅,否则下面不会执行。
Flux<UserbaseEntityResponse.GroupPrincipal> f = Flux.fromIterable(res.getGroups())
.parallel()
.runOn(Schedulers.parallel())
.doOnNext(groupPrincipal -> getResourcesForGroup((ImmutableGroupPrincipal)groupPrincipal, res.getUserbaseId()))
.sequential();
我可以就组装时间和订阅获得更多意见吗?
“在您订阅之前什么都不会发生”并非在所有情况下都完全正确。发布者(Mono
或 Flux
)将在三种情况下执行:
- 您订阅了;
- 你屏蔽了;
- 发布者很“火爆”。
请注意,上述场景都适用于整个反应链——即如果我订阅了一个发布者,上游的所有内容(依赖于该发布者)也会执行。这就是为什么框架可以并且应该在需要时调用订阅,从而导致控制器中定义的反应链执行。
在你的情况下,它实际上是其中的第二个——你在阻塞,这本质上是一个“订阅并等待结果”。通常阻塞的方法都有清楚的标记,但总是情况并非如此-在您的情况下,是Flux
上的toIterable()
方法执行阻塞:
Transform this Flux into a lazy Iterable blocking on
Iterator.next()
calls.
但是啊,你说,我不打电话 Iterator.next()
- 怎么了?!
嗯,隐含地你是通过调用 forEachRemaining()
:
The default implementation behaves as if:
while (hasNext()) action.accept(next());
...并且根据上述规则,由于 ussClient.getGroups(request)
是此阻塞调用的上游,因此它会被执行。