如何在 Kotlin 中创建带有几个订阅的流?
How to create a flow with a few subscribtions in Kotlin?
我需要 运行 一个发出一些数据的任务。我想像 PublishSubject 一样订阅这些数据。但是我解决不了一个实例流的问题。如果我尝试再次调用它,它将创建另一个实例并且该作业将完成两次。
我尝试 运行 内部流和 post 值到 BroadcastChannel,但这个解决方案似乎不正确。
此类任务的最佳实践是什么?
这会很神奇:
fun <T> Flow<T>.refCount(capacity: Int = Channel.CONFLATED, dispatcher: CoroutineDispatcher = Dispatchers.Default): Flow<T> {
class Context(var counter: Int) {
lateinit var job: Job
lateinit var channel: BroadcastChannel<T>
}
val context = Context(0)
fun lock() = synchronized(context) {
if (++context.counter > 1) {
return@synchronized
}
context.channel = BroadcastChannel(capacity)
context.job = GlobalScope.async(dispatcher) {
try {
collect { context.channel.offer(it) }
} catch (e: Exception) {
context.channel.close(e)
}
}
}
fun unlock() = synchronized(context) {
if (--context.counter == 0) {
context.job.cancel()
}
}
return flow {
lock()
try {
emitAll(context.channel.openSubscription())
} finally {
unlock()
}
}
}
我需要 运行 一个发出一些数据的任务。我想像 PublishSubject 一样订阅这些数据。但是我解决不了一个实例流的问题。如果我尝试再次调用它,它将创建另一个实例并且该作业将完成两次。 我尝试 运行 内部流和 post 值到 BroadcastChannel,但这个解决方案似乎不正确。 此类任务的最佳实践是什么?
这会很神奇:
fun <T> Flow<T>.refCount(capacity: Int = Channel.CONFLATED, dispatcher: CoroutineDispatcher = Dispatchers.Default): Flow<T> {
class Context(var counter: Int) {
lateinit var job: Job
lateinit var channel: BroadcastChannel<T>
}
val context = Context(0)
fun lock() = synchronized(context) {
if (++context.counter > 1) {
return@synchronized
}
context.channel = BroadcastChannel(capacity)
context.job = GlobalScope.async(dispatcher) {
try {
collect { context.channel.offer(it) }
} catch (e: Exception) {
context.channel.close(e)
}
}
}
fun unlock() = synchronized(context) {
if (--context.counter == 0) {
context.job.cancel()
}
}
return flow {
lock()
try {
emitAll(context.channel.openSubscription())
} finally {
unlock()
}
}
}