Reactor Flux<String> 一对多 Flux<T>
Reactor Flux<String> one to many Flux<T>
我目前有一些异步操作导致的通量并产生 Flux<String>
。然后我想在遗留回调 API 中使用这些字符串作为参数,这将导致每个输入字符串从主通量发出多个事件。
以下代码按预期工作,但我无法找到一种方法来成功终止第二个通量,而不用做一些看起来很棘手的事情,比如保持计数器等。有没有更惯用的方法来做到这一点?
public Flux<Account> getAccounts(UUID userId) {
var tokens = tokenRepo.findAllActiveByAccountUUID(userId);
return tokens.flatMap(p -> Flux.create(e -> {
var r = new AccountsGetRequest().accessToken(p);
c.accountsGet(r).enqueue(new Callback<>() {
@Override
public void onResponse(@NotNull Call<AccountsGetResponse> call,
@NotNull Response<AccountsGetResponse> response) {
if (response.isSuccessful() && response.body() != null) {
(response.body()).forEach(e::next);
e.complete();
} else {
log.debug(response.toString());
e.error(new RuntimeException("getAccounts" + response.code()));
e.complete();
}
}
@Override
public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
e.error(t);
e.complete();
}
});
}));
}
您只需在 forEach
之后调用 e.complete()
我认为使用 Flux.fromIterable(response.body())
而不是 Flux.generate
会更简单
请注意,如果 c.accountsGet(r)
是阻塞调用,您可能应该使用 publishOn
运算符切换到另一个调度程序以防止阻塞主线程。
我目前有一些异步操作导致的通量并产生 Flux<String>
。然后我想在遗留回调 API 中使用这些字符串作为参数,这将导致每个输入字符串从主通量发出多个事件。
以下代码按预期工作,但我无法找到一种方法来成功终止第二个通量,而不用做一些看起来很棘手的事情,比如保持计数器等。有没有更惯用的方法来做到这一点?
public Flux<Account> getAccounts(UUID userId) {
var tokens = tokenRepo.findAllActiveByAccountUUID(userId);
return tokens.flatMap(p -> Flux.create(e -> {
var r = new AccountsGetRequest().accessToken(p);
c.accountsGet(r).enqueue(new Callback<>() {
@Override
public void onResponse(@NotNull Call<AccountsGetResponse> call,
@NotNull Response<AccountsGetResponse> response) {
if (response.isSuccessful() && response.body() != null) {
(response.body()).forEach(e::next);
e.complete();
} else {
log.debug(response.toString());
e.error(new RuntimeException("getAccounts" + response.code()));
e.complete();
}
}
@Override
public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
e.error(t);
e.complete();
}
});
}));
}
您只需在 forEach
e.complete()
我认为使用 Flux.fromIterable(response.body())
而不是 Flux.generate
请注意,如果 c.accountsGet(r)
是阻塞调用,您可能应该使用 publishOn
运算符切换到另一个调度程序以防止阻塞主线程。