Kotlin SharedFlow 组合操作。在特定情况下有 zip 行为
Kotlin SharedFlow combine operation. Have zip behaviour in a specific situation
我正在合并两个 SharedFlows
,然后执行长时间的工作操作。
一开始,我知道状态,所以我为两个流发出一个“起始值”。之后用户可以发送到任一流。
这两个流大多是独立的,但在特定情况下,用户可以同时向两个流发送。这样做的目的是组合被触发两次并且长时间的工作被执行两次,实际上,在这种情况下,我只对接收两个值感兴趣但只执行一次工作。
这是我的:
val _numbers = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val numbers: SharedFlow<Int> = _numbers
val _strings = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val strings: SharedFlow<String> = _strings
combine(numbers, strings) { (number, strings) ->
println("values $number - $strings. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
runBlocking {
delay(500)
// This is the initial values. I always know this at start.
_numbers.emit(0)
_strings.emit("a")
// Depending of user action, number or string is emitted.
delay(100)
_numbers.emit(1)
delay(100)
_numbers.emit(2)
delay(100)
_numbers.emit(3)
delay(100)
_numbers.emit(4)
delay(100)
_strings.emit("b")
delay(100)
_strings.emit("c")
delay(100)
_strings.emit("d")
delay(100)
_strings.emit("e")
delay(100)
// In a specific situation both values need to change but I only want to trigger the long working job once
_numbers.emit(10)
_strings.emit("Z")
}
这可以产生这个:
values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job
或者这样:
values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job
由于缓冲区溢出,有时我可以实现我想要的(最新的)但在其他情况下,我有我不感兴趣的values 10 - e. Starting to perform a long working job
。
有什么办法可以强制执行,当向两者发射时,只开始长时间工作一次?
如果你想保留2个流程,那么单双事件的区别就必须是基于时间的。您将无法区分字符串然后数字的快速更新与“双重更新”。
如果基于时间的适合您,在长处理之前使用 debounce
应该是可行的方法:
combine(numbers, strings) { (number, string) -> number to string }
.debounce(50)
.onEach { (number, string) ->
println("values $number - $string. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
此处,combine
仅从 2 个流构建对,但仍获取所有事件,然后 debounce
忽略事件的快速连续,仅发送快速系列中的最新事件。这也会引入轻微的延迟,但这完全取决于您想要实现的目标。
如果基于时间的区分对您来说不合适,您需要一种方法让生产者以不同于 2 个单一事件的方式发送双重事件。为此,您可以使用单个事件流,例如,您可以像这样定义事件:
sealed class Event {
data class SingleNumberUpdate(val value: Int): Event()
data class SingleStringUpdate(val value: String): Event()
data class DoubleUpdate(val num: Int, val str: String): Event()
}
但是你必须自己编写“组合”逻辑(保持最新数字和字符串的状态):
flow {
var num = 0
var str = "a"
emit(num to str)
events.collect { e ->
when (e) {
is Event.SingleNumberUpdate -> {
num = e.value
}
is Event.SingleStringUpdate -> {
str = e.value
}
is Event.DoubleUpdate -> {
num = e.num
str = e.str
}
}
emit(num to str)
}
}
.onEach { (number, strings) ->
println("values $number - $strings. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
我正在合并两个 SharedFlows
,然后执行长时间的工作操作。
一开始,我知道状态,所以我为两个流发出一个“起始值”。之后用户可以发送到任一流。
这两个流大多是独立的,但在特定情况下,用户可以同时向两个流发送。这样做的目的是组合被触发两次并且长时间的工作被执行两次,实际上,在这种情况下,我只对接收两个值感兴趣但只执行一次工作。
这是我的:
val _numbers = MutableSharedFlow<Int>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val numbers: SharedFlow<Int> = _numbers
val _strings = MutableSharedFlow<String>(replay = 0, extraBufferCapacity = 1, onBufferOverflow = BufferOverflow.DROP_OLDEST)
val strings: SharedFlow<String> = _strings
combine(numbers, strings) { (number, strings) ->
println("values $number - $strings. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
runBlocking {
delay(500)
// This is the initial values. I always know this at start.
_numbers.emit(0)
_strings.emit("a")
// Depending of user action, number or string is emitted.
delay(100)
_numbers.emit(1)
delay(100)
_numbers.emit(2)
delay(100)
_numbers.emit(3)
delay(100)
_numbers.emit(4)
delay(100)
_strings.emit("b")
delay(100)
_strings.emit("c")
delay(100)
_strings.emit("d")
delay(100)
_strings.emit("e")
delay(100)
// In a specific situation both values need to change but I only want to trigger the long working job once
_numbers.emit(10)
_strings.emit("Z")
}
这可以产生这个:
values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job
或者这样:
values 0 - a. Starting to perform a long working job
values 1 - a. Starting to perform a long working job
values 2 - a. Starting to perform a long working job
values 3 - a. Starting to perform a long working job
values 4 - a. Starting to perform a long working job
values 4 - b. Starting to perform a long working job
values 4 - c. Starting to perform a long working job
values 4 - d. Starting to perform a long working job
values 4 - e. Starting to perform a long working job
values 10 - Z. Starting to perform a long working job
由于缓冲区溢出,有时我可以实现我想要的(最新的)但在其他情况下,我有我不感兴趣的values 10 - e. Starting to perform a long working job
。
有什么办法可以强制执行,当向两者发射时,只开始长时间工作一次?
如果你想保留2个流程,那么单双事件的区别就必须是基于时间的。您将无法区分字符串然后数字的快速更新与“双重更新”。
如果基于时间的适合您,在长处理之前使用 debounce
应该是可行的方法:
combine(numbers, strings) { (number, string) -> number to string }
.debounce(50)
.onEach { (number, string) ->
println("values $number - $string. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))
此处,combine
仅从 2 个流构建对,但仍获取所有事件,然后 debounce
忽略事件的快速连续,仅发送快速系列中的最新事件。这也会引入轻微的延迟,但这完全取决于您想要实现的目标。
如果基于时间的区分对您来说不合适,您需要一种方法让生产者以不同于 2 个单一事件的方式发送双重事件。为此,您可以使用单个事件流,例如,您可以像这样定义事件:
sealed class Event {
data class SingleNumberUpdate(val value: Int): Event()
data class SingleStringUpdate(val value: String): Event()
data class DoubleUpdate(val num: Int, val str: String): Event()
}
但是你必须自己编写“组合”逻辑(保持最新数字和字符串的状态):
flow {
var num = 0
var str = "a"
emit(num to str)
events.collect { e ->
when (e) {
is Event.SingleNumberUpdate -> {
num = e.value
}
is Event.SingleStringUpdate -> {
str = e.value
}
is Event.DoubleUpdate -> {
num = e.num
str = e.str
}
}
emit(num to str)
}
}
.onEach { (number, strings) ->
println("values $number - $strings. Starting to perform a long working job")
}
.launchIn(CoroutineScope(Dispatchers.IO))