Monix 如何将背压与 flatMap 运算符一起使用?
how does Monix use back-pressure with flatMap operator?
Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source
的背压。
请看这个测试代码:
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test
class MonixBackpressureWithGroupByTest2 {
@Test
def test(): Unit = {
val source = Observable.range(0,130)
val backPressuredStream = source.map(x => {
println("simple log first map - " + x)
x
})
.asyncBoundary(OverflowStrategy.BackPressure(5))
.map { i =>
println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
((i % 3) toString) -> i
}
.groupBy{case (k, v) => k}
.flatMap(x => {
val mapWithSleep = x.map{case groupedMsg@(key, value) =>
Thread.sleep(2000)
println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
groupedMsg
}
mapWithSleep
})
backPressuredStream.share.subscribe(
(keyAndValue: (String, Long)) => Continue
)
global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
println("========sleep 1 second ============")
})
Thread.currentThread().join()
}
}
输出:
...
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...
其中出现一些背压不匹配:
之后:sleep 2 second for every message ...
背压给出项目 after backpressure map - ...
的三个
在背压方面,sleep 2 second for every message ...
与 after backpressure map - ...
怎么可能存在一对一的关系?
还有一个疑问:为什么 sleep 2 second for every message
的日志输出 (0, 72), (0, 75), (0,78)
而这样的东西 (0, 72), (1, 73), (2,74)
?
谢谢。
Monix 版本:
"io.monix" %% "monix" % "3.0.0-RC1"
您看到的行为正是您所期望的。
为了快速总结您的应用程序的功能,让我用我的话来解释一下:
你有一个 Observable
生成数字,并对每个元素做一些副作用。
接下来,您按 _ % 3
对元素进行分组。
接下来,您在每个组的 Observable
.
中执行更多副作用(睡眠和写入控制台)
然后,你 flatMap
每组的 Observable
,结果是一个单一的,平坦的 Observable
。
那么,为什么一开始您只看到第一组(_ % 3 == 0
)向控制台打印内容? ***
答案就在flatMap
:当查看Observable
的documentation时,您会发现flatMap
的描述如下:
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Alias for concatMap.
[...]
像思考 List
一样思考 Observable
:当你连接 List
时,你最终会得到一个 List
首先包含第一个 List
的元素,然后是第二个 List
的元素,依此类推。
在 Monix 中,通过等待 flatMap
(读取:concatMap
)操作中产生的第一个 Observable
来发送 Observable
的相同行为"completed" - 信号。然后才会消耗第二个Observable
,以此类推。
或者简单地说,flatMap
关心的是生成的Observable
s的顺序。
但是 你的 flatMap
操作 "complete" 中的 Observable
什么时候开始?为此,我们必须了解 groupBy
是如何工作的——因为那是它们的来源。
要使 groupBy
正常工作,尽管 Observable
是延迟计算的,它必须将传入的元素存储在缓冲区中。我不是 100% 确定这一点,但是如果 groupBy
像我想的那样工作,对于任何拉出下一个元素的分组 Observable
,它将无限期地遍历原始 Observable
直到它找到属于该组的元素,将属于该缓冲区中其他组的所有先前(但尚未需要)的元素保存在该缓冲区中以备后用。
所有这些意味着 groupBy
无法知道是否已找到组的所有元素,直到源 Observable
发出完成信号,然后它将使用所有剩余的缓冲元素,然后发出完成信号分组的 Observable
s.
简单来说:由 groupBy
生成的 Observable
在源 Observable
完成之前不会完成。
当将所有这些信息放在一起时,您会明白只有当源 Observable(您的 Observable.range(0, 130)
)完成时,第一个分组 Observable
也会完成,并且由于 flatMap
只有这样所有其他分组的 Observable
才会被使用。
因为我从你的上一个问题中知道你正在尝试构建一个网络套接字,所以使用 flatMap
是一个坏主意 - 你的传入请求的来源 Observable
将 从不 完成,实际上只服务于您将遇到的第一个 IP 地址。
你要做的是使用 mergeMap
. 当与 concatMap
比较时 mergeMap
不关心元素的顺序,而不是 "first come first served" - 规则适用。
*** :当你读完我的解释并希望理解 groupBy
和 flatMap
的工作原理时,你就会明白我为什么写 "in the beginning"!
Monix 使用 Ack 来同步发出的消息,但是如果我使用 groupBy 和 flatMap,内部 Observable 不会跟随 source
的背压。
请看这个测试代码:
import java.util.concurrent.TimeUnit
import monix.execution.Scheduler.Implicits.global
import monix.execution.Ack.Continue
import monix.reactive.{Observable, OverflowStrategy}
import org.junit.Test
class MonixBackpressureWithGroupByTest2 {
@Test
def test(): Unit = {
val source = Observable.range(0,130)
val backPressuredStream = source.map(x => {
println("simple log first map - " + x)
x
})
.asyncBoundary(OverflowStrategy.BackPressure(5))
.map { i =>
println("after backpressure map, and Rim 3 operation of source - " + ((i % 3) toString) -> i)
((i % 3) toString) -> i
}
.groupBy{case (k, v) => k}
.flatMap(x => {
val mapWithSleep = x.map{case groupedMsg@(key, value) =>
Thread.sleep(2000)
println("inner Observable after group by rim 3. sleep 2 second for every message - " + groupedMsg)
groupedMsg
}
mapWithSleep
})
backPressuredStream.share.subscribe(
(keyAndValue: (String, Long)) => Continue
)
global.scheduleWithFixedDelay(0L, 1000L, TimeUnit.MILLISECONDS, () => {
println("========sleep 1 second ============")
})
Thread.currentThread().join()
}
}
输出:
...
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,72)
(after backpressure map, and Rim 3 operation of source - 1,73)
(after backpressure map, and Rim 3 operation of source - 2,74)
(after backpressure map, and Rim 3 operation of source - 0,75)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,75)
(after backpressure map, and Rim 3 operation of source - 1,76)
(after backpressure map, and Rim 3 operation of source - 2,77)
(after backpressure map, and Rim 3 operation of source - 0,78)
========sleep 1 second ============
========sleep 1 second ============
inner Observable after group by rim 3. sleep 2 second for every message - (0,78)
(after backpressure map, and Rim 3 operation of source - 1,79)
...
其中出现一些背压不匹配:
之后:sleep 2 second for every message ...
背压给出项目 after backpressure map - ...
在背压方面,sleep 2 second for every message ...
与 after backpressure map - ...
怎么可能存在一对一的关系?
还有一个疑问:为什么 sleep 2 second for every message
的日志输出 (0, 72), (0, 75), (0,78)
而这样的东西 (0, 72), (1, 73), (2,74)
?
谢谢。
Monix 版本:
"io.monix" %% "monix" % "3.0.0-RC1"
您看到的行为正是您所期望的。
为了快速总结您的应用程序的功能,让我用我的话来解释一下:
你有一个 Observable
生成数字,并对每个元素做一些副作用。
接下来,您按 _ % 3
对元素进行分组。
接下来,您在每个组的 Observable
.
然后,你 flatMap
每组的 Observable
,结果是一个单一的,平坦的 Observable
。
那么,为什么一开始您只看到第一组(_ % 3 == 0
)向控制台打印内容? ***
答案就在flatMap
:当查看Observable
的documentation时,您会发现flatMap
的描述如下:
final def flatMap[B](f: (A) ⇒ Observable[B]): Observable[B]
Alias for concatMap.
[...]
像思考 List
一样思考 Observable
:当你连接 List
时,你最终会得到一个 List
首先包含第一个 List
的元素,然后是第二个 List
的元素,依此类推。
在 Monix 中,通过等待 flatMap
(读取:concatMap
)操作中产生的第一个 Observable
来发送 Observable
的相同行为"completed" - 信号。然后才会消耗第二个Observable
,以此类推。
或者简单地说,flatMap
关心的是生成的Observable
s的顺序。
但是 你的 flatMap
操作 "complete" 中的 Observable
什么时候开始?为此,我们必须了解 groupBy
是如何工作的——因为那是它们的来源。
要使 groupBy
正常工作,尽管 Observable
是延迟计算的,它必须将传入的元素存储在缓冲区中。我不是 100% 确定这一点,但是如果 groupBy
像我想的那样工作,对于任何拉出下一个元素的分组 Observable
,它将无限期地遍历原始 Observable
直到它找到属于该组的元素,将属于该缓冲区中其他组的所有先前(但尚未需要)的元素保存在该缓冲区中以备后用。
所有这些意味着 groupBy
无法知道是否已找到组的所有元素,直到源 Observable
发出完成信号,然后它将使用所有剩余的缓冲元素,然后发出完成信号分组的 Observable
s.
简单来说:由 groupBy
生成的 Observable
在源 Observable
完成之前不会完成。
当将所有这些信息放在一起时,您会明白只有当源 Observable(您的 Observable.range(0, 130)
)完成时,第一个分组 Observable
也会完成,并且由于 flatMap
只有这样所有其他分组的 Observable
才会被使用。
因为我从你的上一个问题中知道你正在尝试构建一个网络套接字,所以使用 flatMap
是一个坏主意 - 你的传入请求的来源 Observable
将 从不 完成,实际上只服务于您将遇到的第一个 IP 地址。
你要做的是使用 mergeMap
. 当与 concatMap
比较时 mergeMap
不关心元素的顺序,而不是 "first come first served" - 规则适用。
*** :当你读完我的解释并希望理解 groupBy
和 flatMap
的工作原理时,你就会明白我为什么写 "in the beginning"!