我怎样才能将项目发送到 Kotlin.Flow(如 Behaviorsubject)

How can I send items to a Kotlin.Flow (like a Behaviorsubject)

我想知道如何 send/emit 项目到 Kotlin.Flow,所以我的用例是:

在 consumer/ViewModel/Presenter 中,我可以 订阅 collect 功能:

fun observe() {
 coroutineScope.launch {
    // 1. Send event
    reopsitory.observe().collect {
      println(it)
    }
  }
}

但问题出在 Repository 方面,使用 RxJava 我们可以使用 Behaviorsubject 将其公开为 Observable/Flowable 并像这样发出新项目:

behaviourSubject.onNext(true)

但是每当我建立一个新流程时:

flow {

}

我只能收集。如何将值发送到流?

如果您想获得 subscription/collection 上的 最新 值,您应该使用 ConflatedBroadcastChannel:

private val channel = ConflatedBroadcastChannel<Boolean>()

这将复制 BehaviourSubject,以将频道公开为流:

// Repository
fun observe() {
  return channel.asFlow()
}

现在向公开的 Flow 发送 event/value 简单发送到此频道。

// Repository
fun someLogicalOp() {
  channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}

控制台:

false

如果您希望在开始收集 之后仅接收值 ,您应该改用 BroadcastChannel

说清楚:

表现为 Rx 的 PublishedSubject

private val channel = BroadcastChannel<Boolean>(1)

fun broadcastChannelTest() {
  // 1. Send event
  channel.send(true)

  // 2. Start collecting
  channel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  channel.send(false)
}

false

只有 false 在发送第一个事件时被打印 collect { }.[=29= 之前]


表现为 Rx 的 BehaviourSubject

private val confChannel = ConflatedBroadcastChannel<Boolean>()

fun conflatedBroadcastChannelTest() {
  // 1. Send event
  confChannel.send(true)

  // 2. Start collecting
  confChannel
    .asFlow()
    .collect {
      println(it)
    }

  // 3. Send another event
  confChannel.send(false)
}

true

false

两个事件都被打印出来,你总是得到最新的值(如果存在的话)。

此外,想在 DataFlow 上提及 Kotlin 的团队开发(名称待定):

这似乎更适合这个用例(因为它将是 cold stream)。

更新:

Kotlin 协程 1.4.0 现在可用于 MutableSharedFlow,这取代了对 Channel 的需求。 MutableSharedFlow 清理也是内置的,因此您无需手动打开和关闭它,这与 Channel 不同。如果 Flow

需要类似 api 的主题,请使用 MutableSharedFlow

原始答案

因为你的问题有 android 标签,我将添加一个 Android 实现,允许你轻松创建一个 BehaviorSubjectPublishSubject 来处理它自己的生命周期。

这与 Android 相关,因为您不想忘记关闭通道和泄漏内存。此实现通过将反应流与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处置”反应流的需要。类似于 LiveData

interface EventReceiver<Message> {
    val eventFlow: Flow<Message>
}

interface EventSender<Message> {
    fun postEvent(message: Message)
    val initialMessage: Message?
}

class LifecycleEventSender<Message>(
    lifecycle: Lifecycle,
    private val coroutineScope: CoroutineScope,
    private val channel: BroadcastChannel<Message>,
    override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {

    init {
        lifecycle.addObserver(this)
    }

    override fun postEvent(message: Message) {
        if (!channel.isClosedForSend) {
            coroutineScope.launch { channel.send(message) }
        } else {
            Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
        }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
    fun create() {
        channel.openSubscription()
        initialMessage?.let { postEvent(it) }
    }

    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun destroy() {
        channel.close()
    }
}

class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
    EventReceiver<Message> {
    override val eventFlow: Flow<Message> = channel.asFlow()
}

abstract class EventRelay<Message>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    channel: BroadcastChannel<Message>,
    initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
    EventSender<Message> by LifecycleEventSender<Message>(
        lifecycle,
        coroutineScope,
        channel,
        initialMessage
    )

通过使用 Android 中的 Lifecycle 库,我现在可以创建一个 BehaviorSubject,它可以在 activity/fragment 被销毁后自行清理

class BehaviorSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    ConflatedBroadcastChannel(),
    initialMessage
)

或者我可以使用缓冲 BroadcastChannel

创建一个 PublishSubject
class PublishSubject<String>(
    lifecycle: Lifecycle,
    coroutineScope: CoroutineScope,
    initialMessage = "Initial Message"
) : EventRelay<String>(
    lifecycle,
    coroutineScope,
    BroadcastChannel(Channel.BUFFERED),
    initialMessage
)

现在我可以做这样的事情了

class MyActivity: Activity() {

    val behaviorSubject = BehaviorSubject(
        this@MyActivity.lifecycle,
        this@MyActivity.lifecycleScope
    )

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)

        if (savedInstanceState == null) { 

            behaviorSubject.eventFlow
                .onEach { stringEvent ->
                    Log.d("BehaviorSubjectFlow", stringEvent)
                    // "BehaviorSubjectFlow: Initial Message"
                    // "BehaviorSubjectFlow: Next Message"
                }
                .flowOn(Dispatchers.Main)
                .launchIn(this@MyActivity.lifecycleScope)

        }
    }

    override fun onResume() {
        super.onResume()

        behaviorSubject.postEvent("Next Message")
    }
}

查看 MutableStateFlow 文档,因为它是即将弃用的 ConflatedBroadcastChannel 的替代品。

要获得更好的上下文,请查看 whole discussion on the original issue on Kotlin's repository on Github