将 CompletableFuture<Stream<T>> 转换为 Publisher<T> 是否正确?
Is it correct to convert a CompletableFuture<Stream<T>> to a Publisher<T>?
为了允许对来自 CompletableFuture<Stream<String>>
的结果流进行多次迭代,我正在考虑以下方法之一:
将结果未来转换为 CompletableFuture<List<String>>
通过:teams.thenApply(st -> st.collect(toList()))
使用缓存将生成的未来转换为 Flux<String>
:Flux.fromStream(teams::join).cache();
Flux<T>
是Publisher<T>
在项目reactor中的实现。
用例:
我想从提供 League
对象和 Standing[]
(基于足球-data RESTful API,例如 http://api.football-data.org/v1/soccerseasons/445/leagueTable)。使用 AsyncHttpClient
和 Gson
我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
要重新使用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T>
不那么冗长并且提供了我需要的一切。然而,在这种情况下使用它是否正确?
或者我应该改用 CompletableFuture<List<String>>
?或者还有其他更好的选择吗?
更新了一些想法 (2018-03-16):
CompletableFuture<List<String>>
:
- [PROS]
List<String>
会在续集中收集,等以后需要继续处理的时候,说不定已经完成了。
- [缺点] 声明冗长。
- [缺点] 如果我们只想使用一次,那么我们不需要在
List<T>
. 中收集那些物品
Flux<String>
:
- [PROS] 声明简洁
- [PROS] 如果我们只想使用一次,那么我们可以省略
.cache()
并将其转发到下一层,这样可以利用反应式 API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
- [CONS] 如果我们想重用
Flux<T>
我们必须将它包装在可缓存的 Flux<T>
(….cache()
) 中,这反过来会增加第一次遍历的开销,因为它必须将生成的项目存储在内部缓存中。
一旦您下载了联赛 table,并从此 table 中提取了球队名称,我不确定您是否需要 back-pressure 准备好的流来迭代这些项目。将流转换为标准列表(或数组)应该足够好,并且应该有更好的性能,不是吗?
例如:
String[] teamNames = teams.join().toArray(String[]::new);
CompletableFuture<Stream<String>> teams = ...;
Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Flux.fromStream(teams::join)
是一种代码味道,因为它阻止当前线程从 CompletableFuture
获取结果,而 CompletableFuture
在另一个线程上 运行。
为了允许对来自 CompletableFuture<Stream<String>>
的结果流进行多次迭代,我正在考虑以下方法之一:
将结果未来转换为
CompletableFuture<List<String>>
通过:teams.thenApply(st -> st.collect(toList()))
使用缓存将生成的未来转换为
Flux<String>
:Flux.fromStream(teams::join).cache();
Flux<T>
是Publisher<T>
在项目reactor中的实现。
用例:
我想从提供 League
对象和 Standing[]
(基于足球-data RESTful API,例如 http://api.football-data.org/v1/soccerseasons/445/leagueTable)。使用 AsyncHttpClient
和 Gson
我们有:
CompletableFuture<Stream<String>> teams = asyncHttpClient
.prepareGet("http://api.football-data.org/v1/soccerseasons/445/leagueTable")
.execute()
.toCompletableFuture()
.thenApply(Response::getResponseBody)
.thenApply(body -> gson.fromJson(body, League.class));
.thenApply(l -> stream(l.standings).map(s -> s.teamName));
要重新使用生成的流,我有两个选择:
1. CompletableFuture<List<String>> res = teams.thenApply(st -> st.collect(toList()))
2. Flux<String> res = Flux.fromStream(teams::join).cache()
Flux<T>
不那么冗长并且提供了我需要的一切。然而,在这种情况下使用它是否正确?
或者我应该改用 CompletableFuture<List<String>>
?或者还有其他更好的选择吗?
更新了一些想法 (2018-03-16):
CompletableFuture<List<String>>
:
- [PROS]
List<String>
会在续集中收集,等以后需要继续处理的时候,说不定已经完成了。 - [缺点] 声明冗长。
- [缺点] 如果我们只想使用一次,那么我们不需要在
List<T>
. 中收集那些物品
Flux<String>
:
- [PROS] 声明简洁
- [PROS] 如果我们只想使用一次,那么我们可以省略
.cache()
并将其转发到下一层,这样可以利用反应式 API,例如网络通量反应控制器,例如@GetMapping(produces =MediaType.TEXT_EVENT_STREAM) public Flux<String> getTeams() {…}
- [CONS] 如果我们想重用
Flux<T>
我们必须将它包装在可缓存的Flux<T>
(….cache()
) 中,这反过来会增加第一次遍历的开销,因为它必须将生成的项目存储在内部缓存中。
一旦您下载了联赛 table,并从此 table 中提取了球队名称,我不确定您是否需要 back-pressure 准备好的流来迭代这些项目。将流转换为标准列表(或数组)应该足够好,并且应该有更好的性能,不是吗?
例如:
String[] teamNames = teams.join().toArray(String[]::new);
CompletableFuture<Stream<String>> teams = ...;
Flux<String> teamsFlux = Mono.fromFuture(teams).flatMapMany(stream -> Flux.fromStream(stream));
Flux.fromStream(teams::join)
是一种代码味道,因为它阻止当前线程从 CompletableFuture
获取结果,而 CompletableFuture
在另一个线程上 运行。