如何将 Kotlin 流转换为可变流?
How can you convert a kotlin flow to a mutable flow?
我试图在我的 class 中保持可变状态流,但是当我对其应用任何方法时,它将转换为不可变状态 Flow<T>
:
class MyClass : Listener<String> {
private val source = Source()
val flow: Flow<String?>
get() = _flow
// region listener
override fun onUpdate(value: String?) {
if (value!= null) {
// emit object changes to the flow
// not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
_flow.tryEmit(value)
}
}
// end-region
@OptIn(ExperimentalCoroutinesApi::class)
private val _flow by lazy {
MutableStateFlow<String?>(null).onStart {
emitAll(
flow<String?> {
val initialValue = source.getInitialValue()
emit(initialValue)
}.flowOn(MyDispatchers.background)
)
}.onCompletion { error ->
// when the flow is cancelled, stop listening to changes
if (error is CancellationException) {
// is was cancelled
source.removeListener(this@MyClass)
}
}.apply {
// listen to changes and send them to the flow
source.addListener(this@MyClass)
}
}
}
有没有办法在我应用 onCompletion/onStart
方法后仍保持 MutableStateFlow
的流程?
如果将转换应用于可变状态流,则生成的流将变为只读,因为原始流充当其源。如果您想手动发出事件,则需要将它们发出到初始源流。
话虽如此,您在这里想要实现的目标似乎很简单:将基于 API 的回调桥接到 Flow
API。 Kotlin 协程中有一个内置函数可以执行此操作,称为 callbackFlow.
我不确定你的来源 API 如何处理背压,但它看起来像这样:
@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow {
send(getInitialValue())
val listener = object : Listener<String> {
override fun onUpdate(value: String?) {
if (value != null) {
trySend(value)
}
}
}
addListener(listener)
awaitClose {
removeListener(listener)
}
}
或者可能使用 runBlocking { send(value) }
而不是 trySend()
,这取决于 Source
如何处理背压和阻塞在它自己的线程池中。
请注意,flowOn
可能会在此流程之上使用,但它只对 getInitialValue()
真正重要,因为执行回调的线程由 Source
控制无论如何。
如果 Source
添加许多侦听器的成本很高,您还可以考虑使用 shareIn()
运算符共享此流程,以便多个订阅者共享相同的侦听器订阅。
我试图在我的 class 中保持可变状态流,但是当我对其应用任何方法时,它将转换为不可变状态 Flow<T>
:
class MyClass : Listener<String> {
private val source = Source()
val flow: Flow<String?>
get() = _flow
// region listener
override fun onUpdate(value: String?) {
if (value!= null) {
// emit object changes to the flow
// not possible, because the builder operators on the flow below convert it to a `Flow` and it doesn't stay as a `MutableSharedFlow` :(
_flow.tryEmit(value)
}
}
// end-region
@OptIn(ExperimentalCoroutinesApi::class)
private val _flow by lazy {
MutableStateFlow<String?>(null).onStart {
emitAll(
flow<String?> {
val initialValue = source.getInitialValue()
emit(initialValue)
}.flowOn(MyDispatchers.background)
)
}.onCompletion { error ->
// when the flow is cancelled, stop listening to changes
if (error is CancellationException) {
// is was cancelled
source.removeListener(this@MyClass)
}
}.apply {
// listen to changes and send them to the flow
source.addListener(this@MyClass)
}
}
}
有没有办法在我应用 onCompletion/onStart
方法后仍保持 MutableStateFlow
的流程?
如果将转换应用于可变状态流,则生成的流将变为只读,因为原始流充当其源。如果您想手动发出事件,则需要将它们发出到初始源流。
话虽如此,您在这里想要实现的目标似乎很简单:将基于 API 的回调桥接到 Flow
API。 Kotlin 协程中有一个内置函数可以执行此操作,称为 callbackFlow.
我不确定你的来源 API 如何处理背压,但它看起来像这样:
@OptIn(ExperimentalCoroutinesApi::class)
fun Source.asFlow(): Flow<String?> = callbackFlow {
send(getInitialValue())
val listener = object : Listener<String> {
override fun onUpdate(value: String?) {
if (value != null) {
trySend(value)
}
}
}
addListener(listener)
awaitClose {
removeListener(listener)
}
}
或者可能使用 runBlocking { send(value) }
而不是 trySend()
,这取决于 Source
如何处理背压和阻塞在它自己的线程池中。
请注意,flowOn
可能会在此流程之上使用,但它只对 getInitialValue()
真正重要,因为执行回调的线程由 Source
控制无论如何。
如果 Source
添加许多侦听器的成本很高,您还可以考虑使用 shareIn()
运算符共享此流程,以便多个订阅者共享相同的侦听器订阅。