生产者内部的听众

Listener inside producer

我正在尝试为听众创建 producer。 我的代码看起来像这样

suspend fun foo() = produce{
    someEvent.addListener {
        this.send(it)
    }
}

但我收到错误 Suspension functions can be called only within coroutine,这是有道理的。我的问题是。有没有办法使用协程来实现这种模式?

有多种实现方法,具体取决于您要实现的目标:

如果您只想接收最近的事件,那么您应该使用合并渠道和offer方法成功了:

fun foo() = produce<T>(capacity = Channel.CONFLATED) {
    someEvent.addListener {
        offer(it)
    }
}

如果接收 所有 事件至关重要,那么您的选择取决于事件生产者的行为。这里要思考的关键问题是,如果您的事件生成器开始生成大量事件 "non-stop",会发生什么情况。根据经验,大多数 "synchronous" 事件生成器不支持显式背压信号,但它们仍然支持隐式背压信号——如果它们的侦听器速度慢或阻塞线程,它们将减慢速度.因此,通常,以下解决方案非常适合同步事件生成器:

fun foo() = produce<T>() {
    someEvent.addListener {
        runBlocking { send(it) }
    }
}

您还可以指定一些正 capacity = xxx 作为 produce 构建器的参数作为性能优化,如果您遇到一次性生成一批事件并且您不想阻塞的情况生产者,但让消费者按照自己的节奏处理它们。

在极少数情况下,当您的生产者不理解隐式阻塞背压信号时(当它作为某种多线程装置在没有内部同步的情况下猛烈地产生事件时),那么您可以使用无限容量的通道offer,但请注意,如果生产者超过消费者,您可能会面临 运行 内存不足的风险:

fun foo() = produce<T>(capacity = Channel.UNLIMITED) {
    someEvent.addListener {
        offer(it)
    }
}

如果您的生产者支持明确的背压信号(如功能反应流),那么您应该使用特殊的适配器来正确传输他们的背压信号to/from协程。 kotlinx.coroutines 库有许多开箱即用的集成模块,带有用于此目的的各种响应式库。参见 here

注意:不应该suspend修饰符标记你的foo函数。 foo 的调用不会以任何方式挂起调用者。它立即(同步)启动生产者协程。

要了解有关协程和不同类型通道的更多信息,我强烈建议学习 the guide on kotlinx.coroutines