如何链接反应器订阅者
How to chain reactor subscribers
我有一个现有的接口链,我想 运行 作为 reactor 而不是管理我自己的线程和队列
public interface UserLookupService {
public User lookup(String id);
}
public interface UsersHandler {
public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...
// Works well to lookup users in parallel.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(str -> {
userSvc.lookup(str);
});
我如何链接该结果,以便它调用 UsersHandler
批量 User
?
订阅某物会触发链条,因此您通常不能"chain"订阅者,它们是链条中的最后一件事。
想一想,如果这样,您设置反应管道,当您 subscribe
触发管道启动,链将产生结果。
在网络服务器中,subscriber
通常是调用客户端,当客户端 subscribes
他触发服务器中将发布数据的事件链。
A Flux
有点像 1 到 n Mono
的列表。 Mono/Flux
中的每个对象可以说有多个 "states"。这些是 Success
、Error
、Cancel
、Next
、Completed
等等。
当 Mono/Flux
内部进入 Success
状态时,它将发出其中的值。 Mono
通常会在单声道中解决某些问题时变为 Success
。
当您声明 Flux.just("userA", "userB", "userC")
时,您基本上是在要求通量解析您输入的输入。放置字符串会立即解析,因此通量将进入 Success
状态并在 Subscribes
后立即开始发出字符串。所以你所要做的就是声明你想要在某人 Subscribes
.
之后发生的链
这可以通过几种不同的方式来完成,当你想做某事并更改值时,比如你想从 string
到 user
我们通常使用 map
.
如果我们只想对每个对象做一些事情而不是 return 我们可以使用 doOnNext
.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.map(userString -> {
return lookupService.lookup(userString);
})
.doOnNext(user -> {
// if you want to do something on each user
// will return void so if you want to log something
// or handle each user
}).subscribe();
订阅应该是链中的最后一件事。
我有一个现有的接口链,我想 运行 作为 reactor 而不是管理我自己的线程和队列
public interface UserLookupService {
public User lookup(String id);
}
public interface UsersHandler {
public void handle(List<User> users>);
}
UserLookupService userSvc = ...;
UsersHandler usersHandler = ...
// Works well to lookup users in parallel.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.subscribe(str -> {
userSvc.lookup(str);
});
我如何链接该结果,以便它调用 UsersHandler
批量 User
?
订阅某物会触发链条,因此您通常不能"chain"订阅者,它们是链条中的最后一件事。
想一想,如果这样,您设置反应管道,当您 subscribe
触发管道启动,链将产生结果。
在网络服务器中,subscriber
通常是调用客户端,当客户端 subscribes
他触发服务器中将发布数据的事件链。
A Flux
有点像 1 到 n Mono
的列表。 Mono/Flux
中的每个对象可以说有多个 "states"。这些是 Success
、Error
、Cancel
、Next
、Completed
等等。
当 Mono/Flux
内部进入 Success
状态时,它将发出其中的值。 Mono
通常会在单声道中解决某些问题时变为 Success
。
当您声明 Flux.just("userA", "userB", "userC")
时,您基本上是在要求通量解析您输入的输入。放置字符串会立即解析,因此通量将进入 Success
状态并在 Subscribes
后立即开始发出字符串。所以你所要做的就是声明你想要在某人 Subscribes
.
这可以通过几种不同的方式来完成,当你想做某事并更改值时,比如你想从 string
到 user
我们通常使用 map
.
如果我们只想对每个对象做一些事情而不是 return 我们可以使用 doOnNext
.
Flux.just("userA", "userB", "userC")
.parallel(2)
.runOn(Schedulers.parallel())
.map(userString -> {
return lookupService.lookup(userString);
})
.doOnNext(user -> {
// if you want to do something on each user
// will return void so if you want to log something
// or handle each user
}).subscribe();
订阅应该是链中的最后一件事。