我怎样才能将项目发送到 Kotlin.Flow(如 Behaviorsubject)
How can I send items to a Kotlin.Flow (like a Behaviorsubject)
我想知道如何 send/emit 项目到 Kotlin.Flow
,所以我的用例是:
在 consumer/ViewModel/Presenter 中,我可以 订阅 与 collect
功能:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
但问题出在 Repository
方面,使用 RxJava 我们可以使用 Behaviorsubject 将其公开为 Observable/Flowable
并像这样发出新项目:
behaviourSubject.onNext(true)
但是每当我建立一个新流程时:
flow {
}
我只能收集。如何将值发送到流?
如果您想获得 subscription/collection 上的 最新 值,您应该使用 ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
这将复制 BehaviourSubject
,以将频道公开为流:
// Repository
fun observe() {
return channel.asFlow()
}
现在向公开的 Flow
发送 event/value 简单发送到此频道。
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
控制台:
false
如果您希望在开始收集 之后仅接收值 ,您应该改用 BroadcastChannel
。
说清楚:
表现为 Rx 的 PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
channel.send(false)
}
false
只有 false
在发送第一个事件时被打印 在 collect { }
.[=29= 之前]
表现为 Rx 的 BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
confChannel.send(false)
}
true
false
两个事件都被打印出来,你总是得到最新的值(如果存在的话)。
此外,想在 DataFlow
上提及 Kotlin 的团队开发(名称待定):
这似乎更适合这个用例(因为它将是 cold stream)。
更新:
Kotlin 协程 1.4.0
现在可用于 MutableSharedFlow
,这取代了对 Channel
的需求。 MutableSharedFlow
清理也是内置的,因此您无需手动打开和关闭它,这与 Channel
不同。如果 Flow
需要类似 api 的主题,请使用 MutableSharedFlow
原始答案
因为你的问题有 android
标签,我将添加一个 Android 实现,允许你轻松创建一个 BehaviorSubject
或 PublishSubject
来处理它自己的生命周期。
这与 Android 相关,因为您不想忘记关闭通道和泄漏内存。此实现通过将反应流与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处置”反应流的需要。类似于 LiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
通过使用 Android 中的 Lifecycle
库,我现在可以创建一个 BehaviorSubject
,它可以在 activity/fragment 被销毁后自行清理
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
或者我可以使用缓冲 BroadcastChannel
创建一个 PublishSubject
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
现在我可以做这样的事情了
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}
查看 MutableStateFlow 文档,因为它是即将弃用的 ConflatedBroadcastChannel
的替代品。
要获得更好的上下文,请查看 whole discussion on the original issue on Kotlin's repository on Github。
我想知道如何 send/emit 项目到 Kotlin.Flow
,所以我的用例是:
在 consumer/ViewModel/Presenter 中,我可以 订阅 与 collect
功能:
fun observe() {
coroutineScope.launch {
// 1. Send event
reopsitory.observe().collect {
println(it)
}
}
}
但问题出在 Repository
方面,使用 RxJava 我们可以使用 Behaviorsubject 将其公开为 Observable/Flowable
并像这样发出新项目:
behaviourSubject.onNext(true)
但是每当我建立一个新流程时:
flow {
}
我只能收集。如何将值发送到流?
如果您想获得 subscription/collection 上的 最新 值,您应该使用 ConflatedBroadcastChannel:
private val channel = ConflatedBroadcastChannel<Boolean>()
这将复制 BehaviourSubject
,以将频道公开为流:
// Repository
fun observe() {
return channel.asFlow()
}
现在向公开的 Flow
发送 event/value 简单发送到此频道。
// Repository
fun someLogicalOp() {
channel.send(false) // This gets sent to the ViewModel/Presenter and printed.
}
控制台:
false
如果您希望在开始收集 之后仅接收值 ,您应该改用 BroadcastChannel
。
说清楚:
表现为 Rx 的 PublishedSubject
private val channel = BroadcastChannel<Boolean>(1)
fun broadcastChannelTest() {
// 1. Send event
channel.send(true)
// 2. Start collecting
channel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
channel.send(false)
}
false
只有 false
在发送第一个事件时被打印 在 collect { }
.[=29= 之前]
表现为 Rx 的 BehaviourSubject
private val confChannel = ConflatedBroadcastChannel<Boolean>()
fun conflatedBroadcastChannelTest() {
// 1. Send event
confChannel.send(true)
// 2. Start collecting
confChannel
.asFlow()
.collect {
println(it)
}
// 3. Send another event
confChannel.send(false)
}
true
false
两个事件都被打印出来,你总是得到最新的值(如果存在的话)。
此外,想在 DataFlow
上提及 Kotlin 的团队开发(名称待定):
这似乎更适合这个用例(因为它将是 cold stream)。
更新:
Kotlin 协程 1.4.0
现在可用于 MutableSharedFlow
,这取代了对 Channel
的需求。 MutableSharedFlow
清理也是内置的,因此您无需手动打开和关闭它,这与 Channel
不同。如果 Flow
MutableSharedFlow
原始答案
因为你的问题有 android
标签,我将添加一个 Android 实现,允许你轻松创建一个 BehaviorSubject
或 PublishSubject
来处理它自己的生命周期。
这与 Android 相关,因为您不想忘记关闭通道和泄漏内存。此实现通过将反应流与 Fragment/Activity 的创建和销毁联系起来,避免了显式“处置”反应流的需要。类似于 LiveData
interface EventReceiver<Message> {
val eventFlow: Flow<Message>
}
interface EventSender<Message> {
fun postEvent(message: Message)
val initialMessage: Message?
}
class LifecycleEventSender<Message>(
lifecycle: Lifecycle,
private val coroutineScope: CoroutineScope,
private val channel: BroadcastChannel<Message>,
override val initialMessage: Message?
) : EventSender<Message>, LifecycleObserver {
init {
lifecycle.addObserver(this)
}
override fun postEvent(message: Message) {
if (!channel.isClosedForSend) {
coroutineScope.launch { channel.send(message) }
} else {
Log.e("LifecycleEventSender","Channel is closed. Cannot send message: $message")
}
}
@OnLifecycleEvent(Lifecycle.Event.ON_CREATE)
fun create() {
channel.openSubscription()
initialMessage?.let { postEvent(it) }
}
@OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
fun destroy() {
channel.close()
}
}
class ChannelEventReceiver<Message>(channel: BroadcastChannel<Message>) :
EventReceiver<Message> {
override val eventFlow: Flow<Message> = channel.asFlow()
}
abstract class EventRelay<Message>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
channel: BroadcastChannel<Message>,
initialMessage: Message? = null
) : EventReceiver<Message> by ChannelEventReceiver<Message>(channel),
EventSender<Message> by LifecycleEventSender<Message>(
lifecycle,
coroutineScope,
channel,
initialMessage
)
通过使用 Android 中的 Lifecycle
库,我现在可以创建一个 BehaviorSubject
,它可以在 activity/fragment 被销毁后自行清理
class BehaviorSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
ConflatedBroadcastChannel(),
initialMessage
)
或者我可以使用缓冲 BroadcastChannel
PublishSubject
class PublishSubject<String>(
lifecycle: Lifecycle,
coroutineScope: CoroutineScope,
initialMessage = "Initial Message"
) : EventRelay<String>(
lifecycle,
coroutineScope,
BroadcastChannel(Channel.BUFFERED),
initialMessage
)
现在我可以做这样的事情了
class MyActivity: Activity() {
val behaviorSubject = BehaviorSubject(
this@MyActivity.lifecycle,
this@MyActivity.lifecycleScope
)
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
if (savedInstanceState == null) {
behaviorSubject.eventFlow
.onEach { stringEvent ->
Log.d("BehaviorSubjectFlow", stringEvent)
// "BehaviorSubjectFlow: Initial Message"
// "BehaviorSubjectFlow: Next Message"
}
.flowOn(Dispatchers.Main)
.launchIn(this@MyActivity.lifecycleScope)
}
}
override fun onResume() {
super.onResume()
behaviorSubject.postEvent("Next Message")
}
}
查看 MutableStateFlow 文档,因为它是即将弃用的 ConflatedBroadcastChannel
的替代品。
要获得更好的上下文,请查看 whole discussion on the original issue on Kotlin's repository on Github。