Reactor Flux<String> 一对多 Flux<T>

Reactor Flux<String> one to many Flux<T>

我目前有一些异步操作导致的通量并产生 Flux<String>。然后我想在遗留回调 API 中使用这些字符串作为参数,这将导致每个输入字符串从主通量发出多个事件。

以下代码按预期工作,但我无法找到一种方法来成功终止第二个通量,而不用做一些看起来很棘手的事情,比如保持计数器等。有没有更惯用的方法来做到这一点?

    public Flux<Account> getAccounts(UUID userId) {
        var tokens = tokenRepo.findAllActiveByAccountUUID(userId);

          return tokens.flatMap(p -> Flux.create(e -> {
            var r = new AccountsGetRequest().accessToken(p);
            c.accountsGet(r).enqueue(new Callback<>() {
                @Override
                public void onResponse(@NotNull Call<AccountsGetResponse> call,
                                       @NotNull Response<AccountsGetResponse> response) {
                    if (response.isSuccessful() && response.body() != null) {
                        (response.body()).forEach(e::next);
                        e.complete();
                    } else {
                        log.debug(response.toString());
                        e.error(new RuntimeException("getAccounts" + response.code()));
                        e.complete();
                    }
                }

                @Override
                public void onFailure(@NotNull Call<AccountsGetResponse> call, @NotNull Throwable t) {
                    e.error(t);
                    e.complete();
                }
            });
        }));
    }

您只需在 forEach

之后调用 e.complete()

我认为使用 Flux.fromIterable(response.body()) 而不是 Flux.generate

会更简单

请注意,如果 c.accountsGet(r) 是阻塞调用,您可能应该使用 publishOn 运算符切换到另一个调度程序以防止阻塞主线程。