Kotlin RX .zipWith 函数体对每个观察者执行一次还是一次?
Kotlin RX .zipWith function body executes once or once per observer?
当数据传输到我的应用程序时,它遵循以下顺序:
- 1个带有ID信息的ReadStart数据包
- 1 个或多个组合形成有效负载的数据包
- 1 个 ReadDone 数据包表示传输已完成
我有一个创建 Observable 的 Kotlin RX 函数:
val readStartPublishProcessor: PublishProcessor<ReadStartPacket>
val dataPacketPublishProcessor: PublishProcessor<DataPacket>
val readDonePublishProcessor: PublishProcessor<ReadDonePacket>
...
private fun setupReadListener(): Flowable<ReadEvent> {
val dataFlowable = dataPacketPublishProcessor.buffer(readDonePublishProcessor)
return readStartPublishProcessor
.zipWith(other = dataFlowable) { readStart, dataPackets ->
Log.d(tag, "ReadEvent with ${dataPackets.size} data packets")
ReadEvent(event = readStart, payload = combinePackets(dataPackets))
}
}
通过阅读 .zipWith
的文档,我希望 .zipWith
函数体针对从 readStartPublishProcessor
和 dataFlowable
发出的每对值执行一次,并且然后将该计算结果传递给每个订阅者:
The Zip method returns an Observable that applies a function of your
choosing to the combination of items emitted, in sequence, by two (or
more) other Observables, with the results of this function becoming
the items emitted by the returned Observable. ... It will only emit as
many items as the number of items emitted by the source Observable
that emits the fewest items.
但是如果我有超过 1 个观察者,我会看到 .zipWith
函数体执行的次数与观察者的数量相同,每次都使用相同的一对发射值。这是一个问题,因为从 .zipWith
函数体内调用的函数有副作用。 (注意:观察者中未使用 .share 和 .replay 运算符。)
为什么似乎是 运行 每个观察者的 .zipWith
函数体而不是只执行一次,有没有办法写这个不管有多少只执行一次观察员?
几点...
从 zipWith
中调用的函数不应包含副作用。它应该是一个纯函数。如果您绝对需要那里的副作用,请使用 do
运算符之一。
从 zipWith
返回的 observable 是冷的(默认情况下 Observable 是冷的),这意味着每个观察者都有自己的执行上下文。即,操作员每次订阅时都会订阅其源可观察对象,并为每次订阅调用它所具有的功能块。
如果您希望订阅共享执行上下文,则必须使用 share
或 refCount
运算符。 Learn more about Hot and Cold Observables here.
当数据传输到我的应用程序时,它遵循以下顺序:
- 1个带有ID信息的ReadStart数据包
- 1 个或多个组合形成有效负载的数据包
- 1 个 ReadDone 数据包表示传输已完成
我有一个创建 Observable 的 Kotlin RX 函数:
val readStartPublishProcessor: PublishProcessor<ReadStartPacket>
val dataPacketPublishProcessor: PublishProcessor<DataPacket>
val readDonePublishProcessor: PublishProcessor<ReadDonePacket>
...
private fun setupReadListener(): Flowable<ReadEvent> {
val dataFlowable = dataPacketPublishProcessor.buffer(readDonePublishProcessor)
return readStartPublishProcessor
.zipWith(other = dataFlowable) { readStart, dataPackets ->
Log.d(tag, "ReadEvent with ${dataPackets.size} data packets")
ReadEvent(event = readStart, payload = combinePackets(dataPackets))
}
}
通过阅读 .zipWith
的文档,我希望 .zipWith
函数体针对从 readStartPublishProcessor
和 dataFlowable
发出的每对值执行一次,并且然后将该计算结果传递给每个订阅者:
The Zip method returns an Observable that applies a function of your choosing to the combination of items emitted, in sequence, by two (or more) other Observables, with the results of this function becoming the items emitted by the returned Observable. ... It will only emit as many items as the number of items emitted by the source Observable that emits the fewest items.
但是如果我有超过 1 个观察者,我会看到 .zipWith
函数体执行的次数与观察者的数量相同,每次都使用相同的一对发射值。这是一个问题,因为从 .zipWith
函数体内调用的函数有副作用。 (注意:观察者中未使用 .share 和 .replay 运算符。)
为什么似乎是 运行 每个观察者的 .zipWith
函数体而不是只执行一次,有没有办法写这个不管有多少只执行一次观察员?
几点...
从 zipWith
中调用的函数不应包含副作用。它应该是一个纯函数。如果您绝对需要那里的副作用,请使用 do
运算符之一。
从 zipWith
返回的 observable 是冷的(默认情况下 Observable 是冷的),这意味着每个观察者都有自己的执行上下文。即,操作员每次订阅时都会订阅其源可观察对象,并为每次订阅调用它所具有的功能块。
如果您希望订阅共享执行上下文,则必须使用 share
或 refCount
运算符。 Learn more about Hot and Cold Observables here.