使用 spring-data-mongodb-reactive 管理多个 Flux 的生命周期
Managing the lifecycle of multiple Flux with spring-data-mongodb-reactive
我有一个数据服务,我正在认真考虑切换到反应模型。这是一个联合查询引擎,可以根据查询类型调用一个或多个“解析器”实现来解析查询数据。
如果我切换到 spring-data-mongodb-reactive
,那么这些实现中的每一个都必须为以下对象创建多个 Flux
个实例:
- 对信息不同部分的查询
- 正在为 #1 中的每个查询查询所有数据库
注意:我不想合并每个 Flux
因为能够保留 #1 的查询以上分开使最终处理更容易。组合所有联合数据库的每个“部分”查询会很好,但我必须将每个“部分”的数据分开。我希望这是有道理的。
解释完整的工作流程超出了本 post 的范围,但我想知道如何创建任意数量的 Flux
实例,并订阅它们以启动它们,但是然后等到它们全部完成,然后再继续处理所有联合来源的完全检索数据。在 Java 中,我正在寻找类似于 CompletableFuture.allOf()
.
的内容
如果我这样做,我是否接近走上正轨:
public class ReactiveDataService {
private static final Supplier<Example<String>> example1 = () -> Example.of("Example 1");
private static final Supplier<Example<String>> example2 = () -> Example.of("Example 2");
private static final Supplier<Example<String>> example3 = () -> Example.of("Example 3");
private static final Supplier<Example<String>> example4 = () -> Example.of("Example 4");
private static final Supplier<Example<String>> example5 = () -> Example.of("Example 5");
private final Collection<ReactiveMongoRepository<String, String>> repositories;
public ReactiveDataService(Collection<ReactiveMongoRepository<String, String>> repositories) {
this.repositories = repositories;
}
private void processFluxes(final Flux<String> flux1, final Flux<String> flux2, final Flux<String> flux3,
final Flux<String> flux4, final Flux<String> flux5) {
// Call service to process flux stuff
}
/**
* For all repositories, combine fluxes that run the same query.
* Subscribe to each flux immediately to get the query started.
* Add all fluxes to a container flux that processes the results
* upon completion.
* After everything is set up, block until completion.
*/
public void doQuery() {
final Flux<String> flux1 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example1.get()));
flux1.subscribe();
final Flux<String> flux2 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example2.get()));
flux2.subscribe();
final Flux<String> flux3 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example3.get()));
flux3.subscribe();
final Flux<String> flux4 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example4.get()));
flux4.subscribe();
final Flux<String> flux5 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example5.get()));
flux5.subscribe();
final Flux<Flux<String>> fluxes = Flux.just(flux1, flux2, flux3, flux4, flux5)
.doOnComplete(() -> processFluxes(flux1, flux2, flux3, flux4, flux5));
fluxes.blockLast();
}
}
这是一个如何使用 Mono.zip 的示例:
public static void main(String[] args) {
Flux<String> flux0 = Flux.empty();
Flux<String> flux1 = Flux.just("1.1", "1.2", "1.3");
Flux<String> flux2 = Flux.just("2.1", "2.2", "2.3");
Flux<String> flux3 = Flux.just("3.1", "3.2", "3.3");
Mono.zip(lists -> process(lists), flux0.collectList(), flux1.collectList(), flux2.collectList(), flux3.collectList()).block();
}
private static String process(Object[] lists) {
System.out.println("List 0 is " + lists[0]);
System.out.println("List 1 is " + lists[1]);
System.out.println("List 2 is " + lists[2]);
System.out.println("List 3 is " + lists[3]);
return "output";
}
输出:
List 0 is []
List 1 is [1.1, 1.2, 1.3]
List 2 is [2.1, 2.2, 2.3]
List 3 is [3.1, 3.2, 3.3]
因此您可以根据自己的情况进行调整。
请注意 Mono.zip 不能 return null,这就是为什么我把“输出”作为结果,但是如果你不需要任何输出你可以放任何你想要的不为空的东西.
思路是先把每个Flux<String>
用collectList
转成Mono<List<String>>
,这样处理起来会简单一些。 Mono.zip
允许您等待所有完成,并将输出处理为 Object[]
。您可以将每个对象转换成一个List<String>
进行处理。
我有一个数据服务,我正在认真考虑切换到反应模型。这是一个联合查询引擎,可以根据查询类型调用一个或多个“解析器”实现来解析查询数据。
如果我切换到 spring-data-mongodb-reactive
,那么这些实现中的每一个都必须为以下对象创建多个 Flux
个实例:
- 对信息不同部分的查询
- 正在为 #1 中的每个查询查询所有数据库
注意:我不想合并每个 Flux
因为能够保留 #1 的查询以上分开使最终处理更容易。组合所有联合数据库的每个“部分”查询会很好,但我必须将每个“部分”的数据分开。我希望这是有道理的。
解释完整的工作流程超出了本 post 的范围,但我想知道如何创建任意数量的 Flux
实例,并订阅它们以启动它们,但是然后等到它们全部完成,然后再继续处理所有联合来源的完全检索数据。在 Java 中,我正在寻找类似于 CompletableFuture.allOf()
.
如果我这样做,我是否接近走上正轨:
public class ReactiveDataService {
private static final Supplier<Example<String>> example1 = () -> Example.of("Example 1");
private static final Supplier<Example<String>> example2 = () -> Example.of("Example 2");
private static final Supplier<Example<String>> example3 = () -> Example.of("Example 3");
private static final Supplier<Example<String>> example4 = () -> Example.of("Example 4");
private static final Supplier<Example<String>> example5 = () -> Example.of("Example 5");
private final Collection<ReactiveMongoRepository<String, String>> repositories;
public ReactiveDataService(Collection<ReactiveMongoRepository<String, String>> repositories) {
this.repositories = repositories;
}
private void processFluxes(final Flux<String> flux1, final Flux<String> flux2, final Flux<String> flux3,
final Flux<String> flux4, final Flux<String> flux5) {
// Call service to process flux stuff
}
/**
* For all repositories, combine fluxes that run the same query.
* Subscribe to each flux immediately to get the query started.
* Add all fluxes to a container flux that processes the results
* upon completion.
* After everything is set up, block until completion.
*/
public void doQuery() {
final Flux<String> flux1 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example1.get()));
flux1.subscribe();
final Flux<String> flux2 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example2.get()));
flux2.subscribe();
final Flux<String> flux3 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example3.get()));
flux3.subscribe();
final Flux<String> flux4 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example4.get()));
flux4.subscribe();
final Flux<String> flux5 = Flux.fromIterable(repositories)
.flatMap(repo -> repo.findAll(example5.get()));
flux5.subscribe();
final Flux<Flux<String>> fluxes = Flux.just(flux1, flux2, flux3, flux4, flux5)
.doOnComplete(() -> processFluxes(flux1, flux2, flux3, flux4, flux5));
fluxes.blockLast();
}
}
这是一个如何使用 Mono.zip 的示例:
public static void main(String[] args) {
Flux<String> flux0 = Flux.empty();
Flux<String> flux1 = Flux.just("1.1", "1.2", "1.3");
Flux<String> flux2 = Flux.just("2.1", "2.2", "2.3");
Flux<String> flux3 = Flux.just("3.1", "3.2", "3.3");
Mono.zip(lists -> process(lists), flux0.collectList(), flux1.collectList(), flux2.collectList(), flux3.collectList()).block();
}
private static String process(Object[] lists) {
System.out.println("List 0 is " + lists[0]);
System.out.println("List 1 is " + lists[1]);
System.out.println("List 2 is " + lists[2]);
System.out.println("List 3 is " + lists[3]);
return "output";
}
输出:
List 0 is []
List 1 is [1.1, 1.2, 1.3]
List 2 is [2.1, 2.2, 2.3]
List 3 is [3.1, 3.2, 3.3]
因此您可以根据自己的情况进行调整。
请注意 Mono.zip 不能 return null,这就是为什么我把“输出”作为结果,但是如果你不需要任何输出你可以放任何你想要的不为空的东西.
思路是先把每个Flux<String>
用collectList
转成Mono<List<String>>
,这样处理起来会简单一些。 Mono.zip
允许您等待所有完成,并将输出处理为 Object[]
。您可以将每个对象转换成一个List<String>
进行处理。