Flow 等待一段时间,然后将所有发出的元素收集到一个列表中,并保持这个过程 运行

Flow wait some time, then gather all emitted elements into a list, and keep this process running

我有一个流量生成器,它会以随机周期发射元素,我不想一旦发射就处理这些元素,我宁愿将它们收集到一个列表中,然后处理它们,并保持这个过程运行.

例如,如果我想将 2s 作为一个句点,我想要得到的以下流量是

val flow = flow{
  for(i in 1..5){
    emit(i)
    delay(1100)
  }
}
//#1 handle [1,2]
//#2 handle [3,4]
//#3 handle [5]

以下是我尝试但失败的方法:

有更好的主意吗?

简短的回答是目前 AFAIK 没有 built-in 功能。

我认为您要查找的是 time-based chunked 运算符,has been discussed quite a bit,但仍未进入图书馆。不确定这是否在他们近期的计划中。

也许您可以查看 this merge request 中建议的实现并根据您的需要进行调整(您可能不需要那么多的灵活性,也不需要额外的分块策略)。不好意思我afk了不能自己改编了

我想你可以通过

解决这个问题
  1. 每 2 秒使用 flow 到 'tick',
  2. 在共享可变状态中存储输入值 (MutableSharedFlow),
  3. 然后,每次 flow 'ticks',您都可以获取 windowed 值,然后重新设置值。

我还在学习协程,所以反馈对大家有帮助。

定时器流程

每 2 秒创建一个 flow 'ticks'

val windowDuration: Duration = 2.seconds

val timerFlow: Flow<Unit> = flow {
  while (true) {
    println("---------tick---------")
    emit(Unit) // the value is unimportant
    delay(windowDuration)
    yield()
  }
}

我们将使用此 timerFlow 来确定每个 window 的开始和结束。当它 'ticks' 时,我们将 获取我们 window 编辑的值,并重置 window.

共享window状态

将 window 的状态存储在 MutableStateFlow 中。

val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())

(使用 MutableStateFlow 可能不是最好的解决方案。我只是将它用作存储桶 共享状态。参见 the docs 用于共享可变状态的替代方案)

窗口化输入值

现在,对于每个输入值,更新 MutableStateFlow...

// launch a new coroutine, so the inputs are collected in the 'background'
launch {
  inputValues
    .onEach { value: Int -> state.update { it + value } }
    .collect()
}

获取并重置 window

现在每次 timerFlow 滴答,我们都可以获取值,并重置共享状态,所以 接下来window从头开始。

val windowedValuesFlow: Flow<List<Int>> =
  timerFlow.map { _: Unit ->
    // tick!
    // emit the values from the previous window, and reset for the next window
    state.getAndUpdate {
      println("-----reset-window-----")
      emptyList()
    }
  }

windowedValuesFlow
  .takeWhile { it.isNotEmpty() }
  .onEach { windowedValues: List<Int> ->
    println(">>> window: $windowedValues")
  }
  .collect()

防止无限运行

因为timerFlow是无限的,所以这个例子会无限期地运行。你可能只想收集 windows 而 windows 有值。

windowedValuesFlow
  .takeWhile { it.isNotEmpty() } // limit the length of the flow
  .onEach { windowedValues: List<Int> ->
    println(">>> window: $windowedValues")
  }
  .collect()

示例输出

1: emitting
1: delaying 100ms
2: emitting
2: delaying 200ms
3: emitting
3: delaying 300ms
4: emitting
4: delaying 400ms
5: emitting
5: delaying 500ms
6: emitting
6: delaying 600ms
---------tick---------
-----reset-window-----
>>> window: [1, 2, 3, 4, 5, 6]
7: emitting
7: delaying 700ms
8: emitting
8: delaying 800ms
9: emitting
9: delaying 900ms
---------tick---------
-----reset-window-----
>>> window: [7, 8, 9]
10: emitting
10: delaying 1s
---------tick---------
-----reset-window-----
>>> window: [10]
---------tick---------
-----reset-window-----

Process finished with exit code 0

完整示例

  • 科特林 1.6.10
  • Kotlinx 协程 1.6.0
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.getAndUpdate
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.takeWhile
import kotlinx.coroutines.flow.update
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.yield


fun main() {

  val inputValues: Flow<Int> = flow {
    for (i in 1..10) {
      println("$i: emitting")
      emit(i)
      val delay = i.seconds / 10
      println("$i: delaying $delay")
      delay(delay)
    }
  }

  val windowDuration: Duration = 2.seconds

  val timerFlow: Flow<Unit> = flow {
    while (true) {
      delay(windowDuration)
      println("---------tick---------")
      emit(Unit) // the value is unimportant
      yield()
    }
  }

  val state: MutableStateFlow<List<Int>> = MutableStateFlow(listOf())

  runBlocking {

    // launch a new coroutine, so the inputs are collected in the 'background'
    launch {
      inputValues
        .onEach { value: Int -> state.update { it + value } }
        .collect()
    }

    val windowedValuesFlow: Flow<List<Int>> =
      timerFlow.map { _: Unit ->
        // tick!
        // emit the values from the previous window, and reset for the next window
        state.getAndUpdate {
          println("-----reset-window-----")
          emptyList()
        }
      }
    
    windowedValuesFlow
      .takeWhile { it.isNotEmpty() } // limit the length of the flow
      .onEach { windowedValues: List<Int> ->
        println(">>> window: $windowedValues")
      }
      .collect()

  }
}