Monix 如何将背压与 flatMap 运算符一起使用?

how does Monix use back-pressure with flatMap operator?

Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source 的背压。

请看这个测试代码:

import java.util.concurrent.TimeUnit

import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test


class MonixBackpressureWithGroupByTest2 {
  @Test
  def test(): Unit = {
    val source = Observable.range(0,130)

    val backPressuredStream = source.map(x => {
        println("simple log first  map - " + x)
        x
      })
      .asyncBoundary(OverflowStrategy.BackPressure(5))
      .map { i =>

        println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
        ((i % 3) toString) -> i
      }
      .groupBy{case (k, v) => k}
      .flatMap(x => {
        val mapWithSleep = x.map{case groupedMsg@(key, value) =>
          Thread.sleep(2000)
          println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
          groupedMsg
        }

        mapWithSleep

      })

    backPressuredStream.share.subscribe(
      (keyAndValue: (String, Long)) => Continue
    )

    global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
      println("========sleep 1 second ============")
    })

    Thread.currentThread().join()

  }

}

输出:

...

========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...

其中出现一些背压不匹配:
之后:sleep 2 second for every message ... 背压给出项目 after backpressure map - ...

的三个

在背压方面,sleep 2 second for every message ...after backpressure map - ... 怎么可能存在一对一的关系?

还有一个疑问:为什么 sleep 2 second for every message 的日志输出 (0, 72), (0, 75), (0,78) 而这样的东西 (0, 72), (1, 73), (2,74)

谢谢。

Monix 版本: "io.monix" %% "monix" % "3.0.0-RC1"

您看到的行为正是您所期望的。

为了快速总结您的应用程序的功能,让我用我的话来解释一下:


你有一个 Observable 生成数字,并对每个元素做一些副作用。

接下来,您按 _ % 3 对元素进行分组。

接下来,您在每个组的 Observable.

中执行更多副作用(睡眠和写入控制台)

然后,你 flatMap 每组的 Observable,结果是一个单一的,平坦的 Observable


那么,为什么一开始您只看到第一组(_ % 3 == 0)向控制台打印内容? ***

答案就在flatMap:当查看Observabledocumentation时,您会发现flatMap的描述如下:

final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]

Alias for concatMap.

[...]

像思考 List 一样思考 Observable:当你连接 List 时,你最终会得到一个 List首先包含第一个 List 的元素,然后是第二个 List 的元素,依此类推。

在 Monix 中,通过等待 flatMap(读取:concatMap)操作中产生的第一个 Observable 来发送 Observable 的相同行为"completed" - 信号。然后才会消耗第二个Observable,以此类推。

或者简单地说,flatMap关心的是生成的Observables的顺序。

但是 你的 flatMap 操作 "complete" 中的 Observable 什么时候开始?为此,我们必须了解 groupBy 是如何工作的——因为那是它们的来源。

要使 groupBy 正常工作,尽管 Observable 是延迟计算的,它必须将传入的元素存储在缓冲区中。我不是 100% 确定这一点,但是如果 groupBy 像我想的那样工作,对于任何拉出下一个元素的分组 Observable,它将无限期地遍历原始 Observable直到它找到属于该组的元素,将属于该缓冲区中其他组的所有先前(但尚未需要)的元素保存在该缓冲区中以备后用。

所有这些意味着 groupBy 无法知道是否已找到组的所有元素,直到源 Observable 发出完成信号,然后它将使用所有剩余的缓冲元素,然后发出完成信号分组的 Observables.

简单来说:由 groupBy 生成的 Observable 在源 Observable 完成之前不会完成。

当将所有这些信息放在一起时,您会明白只有当源 Observable(您的 Observable.range(0, 130))完成时,第一个分组 Observable 也会完成,并且由于 flatMap 只有这样所有其他分组的 Observable 才会被使用。

因为我从你的上一个问题中知道你正在尝试构建一个网络套接字,所以使用 flatMap 是一个坏主意 - 你的传入请求的来源 Observable从不 完成,实际上只服务于您将遇到的第一个 IP 地址。

你要做的是使用 mergeMap. 当与 concatMap 比较时 mergeMap 不关心元素的顺序,而不是 "first come first served" - 规则适用。


*** :当你读完我的解释并希望理解 groupByflatMap 的工作原理时,你就会明白我为什么写 "in the beginning"!