Project Reactor flux conCat、flux mergeSequential、flux mergeOrdered 之间有什么区别

Project Reactor what are differences between flux conCat, flux mergeSequential, flux mergeOrdered

如果我们提供相同的数据源,所有这些方法都会产生相同的结果。那么它们之间有什么区别呢?

采用以下(设计的)concat() 示例,其中两个发布者以 100 毫秒的间隔发出 3 个元素:

Flux<Integer> a = Flux.range(0, 3).delayElements(Duration.ofMillis(100));
Flux<Integer> b = Flux.range(0, 3).delayElements(Duration.ofMillis(100));

Flux.concat(a, b)
        .timed()
        .doOnNext(x -> System.out.println(x.get() + ": " + x.elapsed().toMillis()))
        .blockLast();

在这里你会看到类似下面的输出:

0: 138
1: 107
2: 108
0: 111
1: 102
2: 107

所以我们以 100 毫秒的间隔发射了 6 个元素。第一个发布者被订阅,以 100 毫秒的间隔发出 3 个元素,然后完成。然后订阅第二个发布者,以 100 毫秒的间隔发出它的 3 个元素,然后它完成。

如果我们将 concat() 替换为 mergeSequential(),您将看到如下内容:

0: 118
1: 107
2: 107
0: 0
1: 0
2: 0

元素以相同的顺序发出 - 但请注意最后 3 个的时间!这是因为行为略有不同 - 在这种情况下,两个 发布者都订阅了,因此开始以 100 毫秒的间隔发射元素。来自第一个发布者的元素在收到时发出,但来自第二个发布者的元素被缓存直到第一个发布者完成。

当第一个发布者完成时,第二个发布者接管 - 我们缓存的所有元素都会立即发出,因此没有延迟(所以时间为零)。我们发出了相同的元素,但是快多了。

这似乎是有利的,但您可能不想跳转到依赖 mergeSequential() 而不是 concat() 的主要原因有两个:

  • 缓存幕后的所有元素会占用内存。在此示例中,只有 3 个元素几乎没有任何内存占用,但如果您开始处理数百万个元素(甚至可能永远不会完成的发布者),您可能很快 运行 内存不足。
  • 立即订阅可能会改变行为。以两个发布者为例——一个改变数据库中的值,另一个读取它。如果将它们连接起来,写入将始终发生在读取之前。如果您同时订阅两者,则情况并非如此,您可能会在它被写入之前阅读它(但不一定。)

出于以上两个原因,根据我的经验,您通常在实际使用中只想使用concat()而不是mergeSequential()

至于 mergeOrdered(),在上面的示例中使用它,您会看到 元素 的实际顺序不同:

0: 127
0: 105
1: 17
1: 90
2: 15
2: 0

这里 mergeSequential() 的热切订阅部分是相同的,但有一个不同之处 - 每个发布者发出的值在发出时进行比较,最小的先发出。因此,您会看到(在本例中)有序的数字流:0,0,1,1,2,2。请注意,时间显示与 mergeSequential() 不同,因为它在您的最终输出中交错了来自两个发布者的值,而不仅仅是按顺序合并它们。