为什么我可以在不调用 yield 或确定 Kotlin 中的 isActive() 标识的情况下取消流程?

Why can I cancel a Flow without either invoking yield or determining isActive() identification in Kotlin?

我已阅读article

有两种方法可以使计算代码可取消。第一个是定期调用检查取消的挂起函数。有一个 yield 函数是一个很好的选择。另一种是显式检查取消状态。

我知道 Flow 正在暂停功能。

我 运行 代码 B ,并如我所料得到结果 B。

我想我不能使计算代码A可取消,但实际上我可以在单击“开始”按钮发出流后单击“停止”按钮取消流,为什么?

代码A

class HandleMeter: ViewModel() { 
    var currentInfo by mutableStateOf(2.0)

    private var myJob: Job?=null

    private fun soundDbFlow() = flow {
          while (true) {
             val data = (0..1000).random().toDouble()
             emit(data)
          }
       }

    fun calCurrentAsynNew() {
        myJob?.cancel()
        myJob = viewModelScope.launch(Dispatchers.IO) {
            soundDbFlow().collect {currentInfo=it }
        }
    }

    fun cancelJob(){
        myJob?.cancel()
    }
}

@Composable
fun Greeting(handleMeter: HandleMeter) {
    var currentInfo = handleMeter.currentInfo

    Column(
        modifier = Modifier.fillMaxSize(),
    ) {

        Text(text = "Current ${currentInfo}")
        Button(
            onClick = { handleMeter.calCurrentAsynNew() }
        ) {
            Text("Start")
        }
        Button(
            onClick = { handleMeter.cancelJob() }
        ) {
            Text("Stop")
        }
    }
}

代码B

import kotlinx.coroutines.*

fun main() = runBlocking {
    
    val job = launch(Dispatchers.IO) {
      cal()  
    }
    delay(1300L) // delay a bit
    println("main: I'm tired of waiting!")
    job.cancelAndJoin() 
    println("main: Now I can quit.")
}

suspend fun cal()  {
   val startTime = System.currentTimeMillis()
   var nextPrintTime = startTime
   var i = 0
   while (i < 5) {     
        if ( System.currentTimeMillis() >= nextPrintTime) {
             println("job: I'm sleeping ${i++} ...")
             nextPrintTime += 500L
         }
   }
}

结果 B

job: I'm sleeping 0 ...
job: I'm sleeping 1 ...
job: I'm sleeping 2 ...
main: I'm tired of waiting!
job: I'm sleeping 3 ...
job: I'm sleeping 4 ...
main: Now I can quit.

添加内容:

致 Tenfour04:谢谢!

如果您说的以下内容属实。我觉得Code C可以在系统一次性完成doBigBlockingCalculation()操作的时候取消吧?为什么我需要代码 D?

由于 emit() 是一个挂起函数,您的 Flow 能够在下一次 while 循环中调用 emit() 函数时中断并结束协程。

代码C

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000).doBigBlockingCalculation()
         emit(data)
      }
 }.flowOn(Dispatchers.Default) // since the calculation is blocking

代码D

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000)
             .chunked(100_000)
             .flatMap {
                 it.doBigBlockingCalculation().also { yield() }
             }
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

它与 CoroutineScopes 和协程的子项有关。 当一个父协程被取消时,它的所有子协程也会被取消。

更多信息在这里: https://kotlinlang.org/docs/coroutine-context-and-dispatchers.html#children-of-a-coroutine

A Flow 本身是冷的。它是一些挂起函数的包装器,当 collect() 或在 Flow 上调用一些其他终端挂起函数时,这些函数将 运行。

在您的代码 A 中,当作业被取消时,它正在取消在 Flow 上调用 collect 的协程。 collect 是一个挂起函数,因此取消将传播到您在 soundDbFlow() 中定义的函数。由于 emit() 是一个挂起函数,您的 Flow 能够在下一次在该 while 循环中调用 emit() 函数时中断并结束协程。

以下是您如何使用这些知识的示例:

假设您的函数必须像这样进行很长的计算:

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000).doBigBlockingCalculation()
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

现在如果你试图取消这个流程,它会起作用,但由于 data 行是一个非常慢的操作,没有暂停,流程仍然会无缘无故地完成这个很长的计算,消耗资源的时间超过了必要的时间。

要解决此问题,您可以通过 yield() 调用将计算分成更小的部分。这样可以更及时的取消Flow

private fun complicatedFlow() = flow {
      while (true) {
         val data = (0..1_000_000)
             .chunked(100_000)
             .flatMap {
                 it.doBigBlockingCalculation().also { yield() }
             }
         emit(data)
      }
   }.flowOn(Dispatchers.Default) // since the calculation is blocking

不是一个完美的例子。将一个大的 IntRange 分块有点浪费。 IntRange 几乎不占用任何内存,但分块将其转换为包含范围内每个值的列表。