尝试管理多个 Flux/Mono,先开始其中一些,然后组合其中一些,但有点迷路
Trying to manage multiple Flux/Mono, starting a few of them before others, and combining some of them, and getting a bit lost
我有一个模块接受实体 ID 和“解析类型”作为参数,然后通过 return Fluxes 的多个操作(主要)异步收集数据。解决方案分为多个(主要是再次)异步操作,每个操作都致力于收集有助于解决方案的不同数据类型。我之所以说“主要”是异步的,是因为某些解析类型需要一些必须同步发生的初步操作,以便为解析的其余异步 Flux 操作提供信息。现在,在进行此同步操作时,至少可以开始整个异步解析操作的一部分。我想在进行同步操作时启动这些 Flux 操作。然后,一旦同步数据被解析,我就可以为正在进行的剩余操作获取每个 Flux。某些解析类型将具有所有 Flux 操作 returning 数据,而其他解析类型收集的信息较少,并且某些 Flux 操作将保持为空。解析操作非常耗时,我希望能够更早地开始一些 Flux 操作,这样我就可以稍微压缩时间——这对我正在完成的事情来说非常重要。所以热心订阅是最理想的,只要我能保证我不会错过任何物品发射。
考虑到这一点,我怎样才能:
- 为解析所有内容所需的每个 Flux 操作创建一个“持有者”或“容器”,并将它们初始化为空(如
Flux.empty()
)
- 将项目添加到我可以在上面的项目 1 中创建的任何内容——它被初始化为空,但我可能想要来自一个或多个有限和异步 Flux 操作的数据,但我不关心将它们分开,当我对它们使用
collectList()
来生成 Mono
. 时,它们可以显示为一个流
- 当其中一些
Flux
操作应该在其他一些操作之前开始时,我该如何开始它们,并确保我不会遗漏任何数据?如果我启动一个名称解析 Flux,例如,我可以添加到它吗,如上面的第 2 项?假设我想开始检索一些数据,然后执行同步操作,然后我从同步操作的结果中创建另一个名称解析 Flux,我可以将这个新的 Flux 附加到原始名称解析 Flux 上吗,因为它将是return使用相同的数据类型?我知道 Flux.merge()
,但如果可能的话,使用我可以继续添加的单个 Flux 引用会很方便。
我需要一个集合对象,比如列表,然后使用合并操作吗?最初,我考虑使用 ConnectableFlux
,直到我意识到它是用于连接多个订阅者,而不是用于连接多个发布者。我认为连接多个发布者是满足我的需求的一个很好的答案,除非这是可以以更好的方式处理的常见模式。
我从事响应式编程的时间很短,所以请耐心等待我尝试描述我想做的事情的方式。如果我能更好地阐明我的意图,请让我知道我不清楚的地方,我很乐意尝试澄清。预先感谢您的时间和帮助!
编辑:
这是最终的 Kotlin 版本,简洁明了:
private val log = KotlinLogging.logger {}
class ReactiveDataService {
private val createMono: () -> Mono<List<Int>> = {
Flux.just(9, 8, 7)
.flatMap {
Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
.parallel()
.runOn(Schedulers.boundedElastic())
}
.collectList()
.cache()
}
private val processResults: (List<String>, List<String>) -> String =
{ d1, d2 -> "\n\tdownstream 1: $d1\n\tdownstream 2: $d2" }
private val convert: (List<Int>, Int) -> Flux<String> =
{ data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }
fun doQuery(): String? {
val mono = createMono()
val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
return Mono.zip(downstream1, downstream2, processResults).block()
}
}
fun main() {
val service = ReactiveDataService()
val start = System.currentTimeMillis()
val result = service.doQuery()
log.info("{}\n\tTotal time: {}ms", result, System.currentTimeMillis() - start)
}
并且输出:
downstream 1: [ 66, 39, 40, 88, 97, 35, 70, 91, 27, 12, 84, 37, 35, 15, 45, 27, 85, 22, 55, 89, 81, 21, 43, 62]
downstream 2: [132, 78, 80, 176, 194, 70, 140, 182, 54, 24, 168, 74, 70, 30, 90, 54, 170, 44, 110, 178, 162, 42, 86, 124]
Total time: 209ms
这听起来像是反应堆的理想工作。可以使用弹性调度程序将同步调用包装到 return 作为 Fluxes(或 Monos),以允许它们并行执行。然后使用各种运算符,您可以将它们组合在一起,形成一个代表结果的 Flux。订阅那个Flux,整机就会启动
我认为您需要使用 Mono.flatMapMany 而不是 Flux.usingWhen。
public class ReactiveDataService {
public static void main(final String[] args) {
ReactiveDataService service = new ReactiveDataService();
service.doQuery();
}
private Flux<Integer> process1(final List<Integer> data) {
return Flux.fromIterable(data);
}
private Flux<Integer> process2(final List<Integer> data) {
return Flux.fromIterable(data).map(i -> i * 2);
}
private String process3(List<Integer> downstream1, List<Integer> downstream2) {
System.out.println("downstream 1: " + downstream1);
System.out.println("downstream 2: " + downstream2);
return "Done";
}
private void doQuery() {
final Mono<List<Integer>> mono =
Flux.just(9, 8, 7)
.flatMap(
limit ->
Flux.fromStream(
Stream.generate(() -> new Random().nextInt(100))
.peek(
i -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
})
.limit(limit))
.parallel()
.runOn(Schedulers.boundedElastic()))
.collectList()
.cache();
final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
Mono.zip(downstream1, downstream2, this::process3).block();
}
}
我有一个模块接受实体 ID 和“解析类型”作为参数,然后通过 return Fluxes 的多个操作(主要)异步收集数据。解决方案分为多个(主要是再次)异步操作,每个操作都致力于收集有助于解决方案的不同数据类型。我之所以说“主要”是异步的,是因为某些解析类型需要一些必须同步发生的初步操作,以便为解析的其余异步 Flux 操作提供信息。现在,在进行此同步操作时,至少可以开始整个异步解析操作的一部分。我想在进行同步操作时启动这些 Flux 操作。然后,一旦同步数据被解析,我就可以为正在进行的剩余操作获取每个 Flux。某些解析类型将具有所有 Flux 操作 returning 数据,而其他解析类型收集的信息较少,并且某些 Flux 操作将保持为空。解析操作非常耗时,我希望能够更早地开始一些 Flux 操作,这样我就可以稍微压缩时间——这对我正在完成的事情来说非常重要。所以热心订阅是最理想的,只要我能保证我不会错过任何物品发射。
考虑到这一点,我怎样才能:
- 为解析所有内容所需的每个 Flux 操作创建一个“持有者”或“容器”,并将它们初始化为空(如
Flux.empty()
) - 将项目添加到我可以在上面的项目 1 中创建的任何内容——它被初始化为空,但我可能想要来自一个或多个有限和异步 Flux 操作的数据,但我不关心将它们分开,当我对它们使用
collectList()
来生成Mono
. 时,它们可以显示为一个流
- 当其中一些
Flux
操作应该在其他一些操作之前开始时,我该如何开始它们,并确保我不会遗漏任何数据?如果我启动一个名称解析 Flux,例如,我可以添加到它吗,如上面的第 2 项?假设我想开始检索一些数据,然后执行同步操作,然后我从同步操作的结果中创建另一个名称解析 Flux,我可以将这个新的 Flux 附加到原始名称解析 Flux 上吗,因为它将是return使用相同的数据类型?我知道Flux.merge()
,但如果可能的话,使用我可以继续添加的单个 Flux 引用会很方便。
我需要一个集合对象,比如列表,然后使用合并操作吗?最初,我考虑使用 ConnectableFlux
,直到我意识到它是用于连接多个订阅者,而不是用于连接多个发布者。我认为连接多个发布者是满足我的需求的一个很好的答案,除非这是可以以更好的方式处理的常见模式。
我从事响应式编程的时间很短,所以请耐心等待我尝试描述我想做的事情的方式。如果我能更好地阐明我的意图,请让我知道我不清楚的地方,我很乐意尝试澄清。预先感谢您的时间和帮助!
编辑: 这是最终的 Kotlin 版本,简洁明了:
private val log = KotlinLogging.logger {}
class ReactiveDataService {
private val createMono: () -> Mono<List<Int>> = {
Flux.just(9, 8, 7)
.flatMap {
Flux.fromIterable(List(it) { Random.nextInt(0, 100) })
.parallel()
.runOn(Schedulers.boundedElastic())
}
.collectList()
.cache()
}
private val processResults: (List<String>, List<String>) -> String =
{ d1, d2 -> "\n\tdownstream 1: $d1\n\tdownstream 2: $d2" }
private val convert: (List<Int>, Int) -> Flux<String> =
{ data, multiplier -> Flux.fromIterable(data.map { String.format("%3d", it * multiplier) }) }
fun doQuery(): String? {
val mono = createMono()
val downstream1 = mono.flatMapMany { convert(it, 1) }.collectList()
val downstream2 = mono.flatMapMany { convert(it, 2) }.collectList()
return Mono.zip(downstream1, downstream2, processResults).block()
}
}
fun main() {
val service = ReactiveDataService()
val start = System.currentTimeMillis()
val result = service.doQuery()
log.info("{}\n\tTotal time: {}ms", result, System.currentTimeMillis() - start)
}
并且输出:
downstream 1: [ 66, 39, 40, 88, 97, 35, 70, 91, 27, 12, 84, 37, 35, 15, 45, 27, 85, 22, 55, 89, 81, 21, 43, 62]
downstream 2: [132, 78, 80, 176, 194, 70, 140, 182, 54, 24, 168, 74, 70, 30, 90, 54, 170, 44, 110, 178, 162, 42, 86, 124]
Total time: 209ms
这听起来像是反应堆的理想工作。可以使用弹性调度程序将同步调用包装到 return 作为 Fluxes(或 Monos),以允许它们并行执行。然后使用各种运算符,您可以将它们组合在一起,形成一个代表结果的 Flux。订阅那个Flux,整机就会启动
我认为您需要使用 Mono.flatMapMany 而不是 Flux.usingWhen。
public class ReactiveDataService {
public static void main(final String[] args) {
ReactiveDataService service = new ReactiveDataService();
service.doQuery();
}
private Flux<Integer> process1(final List<Integer> data) {
return Flux.fromIterable(data);
}
private Flux<Integer> process2(final List<Integer> data) {
return Flux.fromIterable(data).map(i -> i * 2);
}
private String process3(List<Integer> downstream1, List<Integer> downstream2) {
System.out.println("downstream 1: " + downstream1);
System.out.println("downstream 2: " + downstream2);
return "Done";
}
private void doQuery() {
final Mono<List<Integer>> mono =
Flux.just(9, 8, 7)
.flatMap(
limit ->
Flux.fromStream(
Stream.generate(() -> new Random().nextInt(100))
.peek(
i -> {
try {
Thread.sleep(500);
} catch (InterruptedException ignored) {
}
})
.limit(limit))
.parallel()
.runOn(Schedulers.boundedElastic()))
.collectList()
.cache();
final Mono<List<Integer>> downstream1 = mono.flatMapMany(this::process1).collectList();
final Mono<List<Integer>> downstream2 = mono.flatMapMany(this::process2).collectList();
Mono.zip(downstream1, downstream2, this::process3).block();
}
}