Kotlin Flow onBackpressureDrop RxJava2 模拟

Kotlin Flow onBackpressureDrop RxJava2 analog

在 RxJava 2 Flowable 中有不同的背压策略,其中最有趣的是:

在整个 Rx 链中都受到尊重。

在 Kotlin 中有 Flow,它声明它具有开箱即用的背压支持。 我能够使用以下方法使 Flow 具有 BUFFER 和 LATEST 策略:

对于缓冲区:

observeFlow()
    .buffer(10)
    .collect { ... }

最新:

observeFlow()
    .conflate()
    .collect { ... }

这只是同一个缓冲区运算符的快捷方式。

但是我找不到任何可以像 DROP 一样工作的东西。 简而言之,当先前的值尚未处理时,DROP 将丢弃流中出现的任何值。 对于 Flow,我什至不确定它是否可行。

考虑案例:

observeFlow()
    .backpressureDrop() // non-existent operator, just for illustrative purposes
    .map { ... }
    .flatMapMerge { ... }
    .collect { ... }

所以 backpressureDrop 应该尊重在流下面完成的任何工作,而该操作员对下面发生的事情一无所知(没有来自底部的显式回调 - 就像 RxJava Subscriber 中的 "request" 方法).因此,这似乎是不可能的。并且该操作员不应在收集上一个项目之前通过任何事件。

是否有任何我想念的现成运算符,或者是否有一种直接的方法可以用现有的 API 实现类似的东西?

is there a straightforward way to implement something like this

取决于你的直截了当的程度。这是我的做法。

背压转化为协程世界中的编程暂停和恢复。对于 onBackpressureDrop,下游必须指示它已准备好处理一项并暂停,而上游永远不应等待下游准备就绪。

你必须无限制地消费上游,并将物品和终端事件交给下游等待这些信号。

package hu.akarnokd.kotlin.flow.impl

import hu.akarnokd.kotlin.flow.Resumable
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.AbstractFlow
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.FlowCollector
import kotlinx.coroutines.flow.collect
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference

@FlowPreview
internal class FlowOnBackpressureDrop<T>(private val source: Flow<T>)
 : AbstractFlow<T>() {
    @ExperimentalCoroutinesApi
    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        coroutineScope {
            val consumerReady = AtomicBoolean()
            val producerReady = Resumable()
            val value = AtomicReference<T>()
            val done = AtomicBoolean()
            val error = AtomicReference<Throwable>();

            launch {
                try {
                    source.collect {
                        if (consumerReady.get()) {
                            value.set(it);
                            consumerReady.set(false);
                            producerReady.resume();
                        }
                    }
                    done.set(true)
                } catch (ex: Throwable) {
                    error.set(ex)
                }
                producerReady.resume()
            }

            while (true) {
                consumerReady.set(true)
                producerReady.await()

                val d = done.get()
                val ex = error.get()
                val v = value.getAndSet(null)

                if (ex != null) {
                    throw ex;
                }
                if (d) {
                    break;
                }

                collector.emit(v)
            }
        }
    }
}

注:Resumable实施。

让我们来看看实现。

首先,需要5个变量来在上游采集器和下游采集器之间传递信息: - consumerReady 表示下游已为下一项做好准备, - producerReady 表示生产者已经存储了下一个项目(或终端信号)并且下游可以恢复 - value 上游项目准备消费 - done 上游结束 - error 上游失败

接下来,我们必须为上游启动收集器,因为收集正在暂停并且在完成之前根本不会让下游消费者循环 运行。在这个收集器中,我们检查下游消费者是否就绪(通过 consumerReady),如果是,则存储当前项目,清除就绪标志并通过 producerReady 发出其可用性信号。清除 consumerReady 将阻止存储后续上游项目,直到下游本身指示新的准备就绪。

当上游结束或崩溃时,我们设置 doneerror 变量并指示生产者已发言。

launch { } 部分之后,我们现在将继续代表下游收集器使用共享变量。

每一轮的第一件事是表明我们已准备好获取下一个值,然后等待生产者端发出信号,它已将下一个事件放入共享变量中。

接下来,我们从这些变量中收集值。我们急于完成或抛出错误,只有作为最后的手段才将上游项目重新发送到下游收集器。

根据 Anton Spaans 的评论 here,有一种方法可以使用 channelFlow 来模拟掉落。
但问题是默认情况下 channelFlow 构建器使用 BUFFER 策略并且不允许参数化容量。
有一种方法可以在 ChannelFlowBuilder 中参数化容量,但问题是 API 是内部的而 ChannelFlowBuilder 是私有的。
但本质上,如果复制粘贴 ChannelFlowBuilder 实现并像这样创建 class:

class BackPressureDropFlow<T>(private val source: Flow<T>) : AbstractFlow<T>() {

    @InternalCoroutinesApi
    override suspend fun collectSafely(collector: FlowCollector<T>) {
        ChannelFlowBuilder<T>({ source.collect { offer(it) } }, capacity = 0)
            .collect { collector.emit(it) }
    }
}

(或直接应用与变换类似的解决方案)。
然后它似乎起作用了。
这里的主要关键是使用capacity = 0,这表示下游将在收到每个项目时暂停(因为没有缓冲容量)。

我们可以使用 Rendezvous Channel 支持的流来构建它。

When capacity is 0 – it creates RendezvousChannel. This channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendezvous), so send suspends until another coroutine invokes receive and receive suspends until another coroutine invokes send.

集合点通道没有缓冲区。因此,该通道的消费者需要暂停并等待下一个元素,以便将元素发送到该通道。我们可以利用这种特性来丢弃在不使用 Channel.offer 暂停通道的情况下无法接受的值,这是一个正常的非暂停功能。

Channel.offer

Adds element into this queue if it is possible to do so immediately without violating capacity restrictions and returns true. Otherwise, it returns false immediately or throws exception if the channel isClosedForSend (see close for details).

因为channelFlow被缓冲了,我们需要将Flow<T>.buffer下游应用到0。

/**
 * Consume this [Flow] using a channelFlow with no buffer. Elements emitted from [this] flow
 * are offered to the underlying [channelFlow]. If the consumer is not currently suspended and 
 * waiting for the next element, the element is dropped. 
 * 
 * @return a flow that only emits elements when the downstream [Flow.collect] is waiting for the next element
 */
fun <T> Flow<T>.drop(): Flow<T> = channelFlow {
    collect { offer(it) }
}.buffer(capacity = 0)

这是一个慢速消费者如何使用它来删除元素的示例。

fun main() = runBlocking {
    flow {
        (0..100).forEach {
            emit(it)
            delay(100)
        }
    }.drop().collect {
        delay(1000)
        println(it)
    }
}

对应输出:

0
11
21
31
41
51
61
71
81
91