如何链接反应器订阅者

How to chain reactor subscribers

我有一个现有的接口链,我想 运行 作为 reactor 而不是管理我自己的线程和队列

public interface UserLookupService {
    public User lookup(String id);
}
public interface UsersHandler {
    public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...

// Works well to lookup users in parallel. 
Flux.just("userA", "userB", "userC")
    .parallel(2)
    .runOn(Schedulers.parallel())
    .subscribe(str -> {
        userSvc.lookup(str);
    });

我如何链接该结果,以便它调用 UsersHandler 批量 User

订阅某物会触发链条,因此您通常不能"chain"订阅者,它们是链条中的最后一件事。

想一想,如果这样,您设置反应管道,当您 subscribe 触发管道启动,链将产生结果。

在网络服务器中,subscriber 通常是调用客户端,当客户端 subscribes 他触发服务器中将发布数据的事件链。

A Flux 有点像 1 到 n Mono 的列表。 Mono/Flux 中的每个对象可以说有多个 "states"。这些是 SuccessErrorCancelNextCompleted 等等。

Mono/Flux 内部进入 Success 状态时,它将发出其中的值。 Mono 通常会在单声道中解决某些问题时变为 Success

当您声明 Flux.just("userA", "userB", "userC") 时,您基本上是在要求通量解析您输入的输入。放置字符串会立即解析,因此通量将进入 Success 状态并在 Subscribes 后立即开始发出字符串。所以你所要做的就是声明你想要在某人 Subscribes.

之后发生的链

这可以通过几种不同的方式来完成,当你想做某事并更改值时,比如你想从 stringuser 我们通常使用 map.

如果我们只想对每个对象做一些事情而不是 return 我们可以使用 doOnNext.

Flux.just("userA", "userB", "userC")
            .parallel(2)
            .runOn(Schedulers.parallel())
            .map(userString -> {
                return lookupService.lookup(userString);
            })
            .doOnNext(user -> {
                // if you want to do something on each user
                // will return void so if you want to log something
                // or handle each user
            }).subscribe();

订阅应该是链中的最后一件事。