Kotlin - 协程作用域,为什么我的异步不被执行?

Kotlin - coroutine scopes, why doesn't my async get executed?

CoroutineScopes 是如何工作的?

假设我有一个

enum class ConceptualPosition{
    INVALID,
    A,B
}

假设我有一个 UI,用户可以从中点击任一位置,AB

我现在想要一个接收用户输入但在实际请求输入之前忽略它的 Actor。为了简单起见,假设只有一种申请职位的方式。

sealed class PositionRequest{
    /**report the next position offered*/
    object ForwardNext:PositionRequest()
}

所以我们可以构建这样的东西:

fun CoroutineScope.positionActor(
        offeredPosition:ReceiveChannel<ConceptualPosition>,
        requests:ReceiveChannel<PositionRequest>,
        output:SendChannel<ConceptualPosition>
) = launch{
    var lastReceivedPosition = INVALID
    var forwardNextReceived = 0

    println("ACTOR: entering while loop")
    while(true) {
        select<Unit> {
            requests.onReceive {
                println("ACTOR: requests.onReceive($it)")
                when (it) {
                    is PositionRequest.ForwardNext -> ++forwardNextReceived
                }
            }

            offeredPosition.onReceive {
                println("ACTOR: offeredPosition.onReceive($it)")
                lastReceivedPosition = it
                if (forwardNextReceived > 0) {
                    --forwardNextReceived
                    output.send(it)
                }
            }
        }
    }
}

然后建立一个门面与之交互:

class BasicUI{
    private val dispatcher = Dispatchers.IO

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        runBlocking(dispatcher){
            positionActor(offeredPositions,requests,nextPosition)
        }
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

这当然行不通,因为 runBlockingCoroutineScopeinit 不会 return 直到 positionActor(offeredPositions,requests,nextPosition) 启动协程结束...这永远不会因为里面有一个while(true)

那么如果我们让 BasicUI 实现 CoroutineScope 呢?毕竟 what Roman Elizarov said we should do at the KotlinConf,如果我理解正确的话,应该将 positionActor(...) 创建的协程绑定到 BasicUI 实例,而不是runBlocking-块。

让我们看看...

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlin.coroutines.CoroutineContext

class BasicUI:CoroutineScope{

    private val dispatcher = Dispatchers.IO

    private val job = Job()
    override val coroutineContext: CoroutineContext
        get() = job

    /*start a Position Actor that receives input from the UI and forwards them on demand*/
    private val requests = Channel<PositionRequest>()
    private val offeredPositions = Channel<ConceptualPosition>()
    private val nextPosition = Channel<ConceptualPosition>()
    init {
        positionActor(offeredPositions,requests,nextPosition)
    }

    /** Receives a [ConceptualPosition] that may or may not get accepted and acted upon.*/
    fun offerPosition(conceptualPosition: ConceptualPosition) = runBlocking(dispatcher) {
        offeredPositions.send(conceptualPosition)
    }

    /** waits for a [ConceptualPosition] to be offered via [offerPosition], then accepts it*/
    fun getPosition(): ConceptualPosition = runBlocking(dispatcher){
        requests.send(PositionRequest.ForwardNext)
        nextPosition.receive()
    }
}

让我们构建一个小测试用例:我将向演员提供一些他应该忽略的 As,然后启动一个持续提供 Bs 的协程,其中之一将是 return当我向演员征求职位时给我的。

import ConceptualPosition.*
import kotlinx.coroutines.async
import kotlinx.coroutines.runBlocking

fun main(args: Array<String>) = runBlocking{
    val ui = BasicUI()
    println("actor engaged")

    //these should all be ignored
    repeat(5){ui.offerPosition(A)}
    println("offered some 'A's")

    //keep offering 'B' so that eventually, one will be offered after we request a position
    async { while(true){ui.offerPosition(B)} }

    //now get a 'B'
    println("requesting a position")
    val pos = ui.getPosition()
    println("received '$pos'")
}

这导致

actor engaged
ACTOR: entering while loop
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
ACTOR: offeredPosition.onReceive(A)
offered some 'A's
ACTOR: offeredPosition.onReceive(A)
requesting a position
ACTOR: requests.onReceive(PositionRequest$ForwardNext@558da0e9)

...什么都没有。

显然,B 从未被提供——因此,也从未被转发——这导致了主线程的阻塞(在那种情况下应该如此)。

我扔了一个

if(conceptualPosition == ConceptualPosition.B) throw RuntimeException("B offered?!")

进入BasicUI.offerPosition,没有例外,所以...

在这一点上,我可能不得不承认我还不了解 Kotlin CoroutineScope

为什么这个例子不起作用?

这里似乎有两个问题:

  1. offerPosition/getPosition 不是挂起函数。在大多数情况下,使用 runBlocking 是错误的解决方案,应该在必须与同步代码或主函数交互时使用。
  2. async不带任何参数在当前CoroutineScope执行。对于您的主要功能,这是 runBlocking。该文档实际上描述了行为:

The default CoroutineDispatcher for this builder in an internal implementation of event loop that processes continuations in this blocked thread until the completion of this coroutine. See CoroutineDispatcher for the other implementations that are provided by kotlinx.coroutines.

简而言之,当其他延续正在使用它时,async 块将不会在事件循环中轮流执行。由于 getPosition 正在阻止您阻止事件循环。

用挂起函数和 withContext(dispatcher) 替换阻塞函数以在不同的执行器上分派将允许异步函数 运行 和状态最终解决。