将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是否正确?

Is it correct to convert a CompletableFuture<Stream<T>> to a Publisher<T>?

为了允许对来自 CompletableFuture<Stream<String>> 的结果流进行多次迭代,我正在考虑以下方法之一:

  1. 将结果未来转换为 CompletableFuture<List<String>> 通过:teams.thenApply(st -> st.collect(toList()))

  2. 使用缓存将生成的未来转换为 Flux<String>Flux.fromStream(teams::join).cache();

Flux<T>Publisher<T>在项目reactor中的实现。

用例:

我想从提供 League 对象和 Standing[](基于足球-data RESTful API,例如 http://api.football-data.org/v1/soccerseasons/445/leagueTable)。使用 AsyncHttpClientGson 我们有:

CompletableFuture<Stream<String>> teams = asyncHttpClient
    .prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
    .execute()
    .toCompletableFuture()
    .thenApply(Response::getResponseBody)
    .thenApply(body -> gson.fromJson(body, League.class));
    .thenApply(l -> stream(l.standings).map(s -> s.teamName));

要重新使用生成的流,我有两个选择:

1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))

2. Flux<String> res = Flux.fromStream(teams::join).cache()

Flux<T> 不那么冗长并且提供了我需要的一切。然而,在这种情况下使用它是否正确?

或者我应该改用 CompletableFuture<List<String>>?或者还有其他更好的选择吗?

更新了一些想法 (2018-03-16)

CompletableFuture<List<String>>:

Flux<String>:

一旦您下载了联赛 table,并从此 table 中提取了球队名称,我不确定您是否需要 back-pressure 准备好的流来迭代这些项目。将流转换为标准列表(或数组)应该足够好,并且应该有更好的性能,不是吗?

例如:

String[] teamNames = teams.join().toArray(String[]::new);
    CompletableFuture<Stream<String>> teams = ...;
    Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));

Flux.fromStream(teams::join) 是一种代码味道,因为它阻止当前线程从 CompletableFuture 获取结果,而 CompletableFuture 在另一个线程上 运行。