将多个 Kotlin 流程合并到一个列表中,而无需等待第一个值
Combine multiple Kotlin flows in a list without waiting for a first value
我有一个 List<Flow<T>>
,想生成一个 Flow<List<T>>
。这几乎就是 combine
所做的——除了 combine 等待每个 Flow
发出一个初始值,这不是我想要的。以此代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
使用 combine
(因此按原样),这是输出:
[a2, b1, c]
[a2, b2, c]
而我也对所有中间步骤感兴趣。这就是我想要从这三个流程中得到的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在我有两个变通办法,但 none 个都很棒...第一个很丑陋,不适用于可空类型:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
通过强制所有流发出第一个不相关的值,确实调用了 combine
转换器,并让我删除我知道不是实际值的空值。对此进行迭代,更具可读性但更重:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
现在这个工作得很好,但我仍然觉得我做得太过分了。协同程序库中是否缺少我的方法?
我仍然想避免映射到中间包装器类型,正如评论中有人提到的那样,行为有点不对(如果还没有参数发出任何东西,这首先发出一个空列表),但这是比我写问题时想到的解决方案稍微好一点(仍然非常相似)并且适用于可空类型:
inline fun <reified T> instantCombine(
flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
flow.map {
@Suppress("USELESS_CAST") // Required for onStart(null)
Holder(it) as Holder<T>?
}
.onStart { emit(null) }
}) {
it.filterNotNull()
.map { holder -> holder.value }
}
这里是通过此实现的测试套件:
class InstantCombineTest {
@Test
fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
assertThat(instantCombine(emptyList<Flow<String>>()).toList())
.isEmpty()
}
@Test
fun `intermediate steps are emitted`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
assertThat(instantCombine(a, b, c).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a1", "b0"),
listOf("a2", "b0"),
listOf("a2", "b1"),
listOf("a2", "b1", "c"),
listOf("a2", "b2", "c")
)
.inOrder()
}
@Test
fun `a single flow is mirrored`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a2")
)
.inOrder()
}
@Test
fun `null values are kept`() = runBlockingTest {
val a = flow {
emit("a")
emit(null)
emit("b")
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String?>(),
listOf("a"),
listOf(null),
listOf("b")
)
.inOrder()
}
}
这个怎么样:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
它解决了几个问题:
- 无需引入新类型
[]
不在结果流中
- 从调用站点中抽象出空值处理(或无论如何解决),生成的 Flow 自行处理它
因此,您不会注意到任何特定于实现的解决方法,因为您不必在收集期间处理它:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
输出:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
编辑: 更新了处理也发出空值的流的答案。
* 使用的低级数组是线程安全的。这就好像你在处理单个变量。
我有一个 List<Flow<T>>
,想生成一个 Flow<List<T>>
。这几乎就是 combine
所做的——除了 combine 等待每个 Flow
发出一个初始值,这不是我想要的。以此代码为例:
val a = flow {
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
val flows = listOf(a, b, c)
runBlocking {
combine(flows) {
it.toList()
}.collect { println(it) }
}
使用 combine
(因此按原样),这是输出:
[a2, b1, c]
[a2, b2, c]
而我也对所有中间步骤感兴趣。这就是我想要从这三个流程中得到的:
[]
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
现在我有两个变通办法,但 none 个都很棒...第一个很丑陋,不适用于可空类型:
val flows = listOf(a, b, c).map {
flow {
emit(null)
it.collect { emit(it) }
}
}
runBlocking {
combine(flows) {
it.filterNotNull()
}.collect { println(it) }
}
通过强制所有流发出第一个不相关的值,确实调用了 combine
转换器,并让我删除我知道不是实际值的空值。对此进行迭代,更具可读性但更重:
sealed class FlowValueHolder {
object None : FlowValueHolder()
data class Some<T>(val value: T) : FlowValueHolder()
}
val flows = listOf(a, b, c).map {
flow {
emit(FlowValueHolder.None)
it.collect { emit(FlowValueHolder.Some(it)) }
}
}
runBlocking {
combine(flows) {
it.filterIsInstance(FlowValueHolder.Some::class.java)
.map { it.value }
}.collect { println(it) }
}
现在这个工作得很好,但我仍然觉得我做得太过分了。协同程序库中是否缺少我的方法?
我仍然想避免映射到中间包装器类型,正如评论中有人提到的那样,行为有点不对(如果还没有参数发出任何东西,这首先发出一个空列表),但这是比我写问题时想到的解决方案稍微好一点(仍然非常相似)并且适用于可空类型:
inline fun <reified T> instantCombine(
flows: Iterable<Flow<T>>
): Flow<List<T>> = combine(flows.map { flow ->
flow.map {
@Suppress("USELESS_CAST") // Required for onStart(null)
Holder(it) as Holder<T>?
}
.onStart { emit(null) }
}) {
it.filterNotNull()
.map { holder -> holder.value }
}
这里是通过此实现的测试套件:
class InstantCombineTest {
@Test
fun `when no flows are merged, nothing is emitted`() = runBlockingTest {
assertThat(instantCombine(emptyList<Flow<String>>()).toList())
.isEmpty()
}
@Test
fun `intermediate steps are emitted`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
val b = flow {
repeat(3) {
delay(150)
emit("b$it")
}
}
val c = flow {
delay(400)
emit("c")
}
assertThat(instantCombine(a, b, c).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a1", "b0"),
listOf("a2", "b0"),
listOf("a2", "b1"),
listOf("a2", "b1", "c"),
listOf("a2", "b2", "c")
)
.inOrder()
}
@Test
fun `a single flow is mirrored`() = runBlockingTest {
val a = flow {
delay(20)
repeat(3) {
emit("a$it")
delay(100)
}
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String>(),
listOf("a0"),
listOf("a1"),
listOf("a2")
)
.inOrder()
}
@Test
fun `null values are kept`() = runBlockingTest {
val a = flow {
emit("a")
emit(null)
emit("b")
}
assertThat(instantCombine(a).toList())
.containsExactly(
emptyList<String?>(),
listOf("a"),
listOf(null),
listOf("b")
)
.inOrder()
}
}
这个怎么样:
inline fun <reified T> instantCombine(vararg flows: Flow<T>) = channelFlow {
val array= Array(flows.size) {
false to (null as T?) // first element stands for "present"
}
flows.forEachIndexed { index, flow ->
launch {
flow.collect { emittedElement ->
array[index] = true to emittedElement
send(array.filter { it.first }.map { it.second })
}
}
}
}
它解决了几个问题:
- 无需引入新类型
[]
不在结果流中- 从调用站点中抽象出空值处理(或无论如何解决),生成的 Flow 自行处理它
因此,您不会注意到任何特定于实现的解决方法,因为您不必在收集期间处理它:
runBlocking {
instantCombine(a, b, c).collect {
println(it)
}
}
输出:
[a0]
[a1]
[a1, b0]
[a2, b0]
[a2, b1]
[a2, b1, c]
[a2, b2, c]
编辑: 更新了处理也发出空值的流的答案。
* 使用的低级数组是线程安全的。这就好像你在处理单个变量。