如何在 Kotlin 中使用协程处理竞争条件?

How to handle race condition with Coroutines in Kotlin?

我有一个 coroutine/flow 问题想要解决

我有这个方法getClosesRegion,它可以执行以下操作:

  1. 尝试连接到每个 region
  2. 第一个连接的区域(我使用 launch 尝试同时连接所有区域)应该被返回,其余的区域请求应该被取消
  3. 如果所有区域在 30 秒超时后都无法连接,则抛出异常

这是我目前拥有的:

override suspend fun getClosestRegion(): Region {
        val regions = regionsRepository.getRegions()
        val firstSuccessResult = MutableSharedFlow<Region>(replay = 1)
        val scope = CoroutineScope(Dispatchers.IO)

        // Attempts to connect to every region until the first success
        scope.launch {
            regions.forEach { region ->
                launch {
                    val retrofitClient = buildRetrofitClient(region.backendUrl)
                    val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                    val response = regionAuthenticationAPI.canConnect()
                    if (response.isSuccessful && scope.isActive) {
                        scope.cancel()
                        firstSuccessResult.emit(region)
                    }
                }
            }
        }

        val result = withTimeoutOrNull(TimeUnit.SECONDS.toMillis(30)) { firstSuccessResult.first() }
        if (result != null)
            return result
        throw Exception("Failed to connect to any region")
    }

当前代码的问题:

  1. 如果成功连接了 1 个区域,我预计请求的 将被取消(由 scope.cancel()),但实际上在第一个区域之后成功连接的其他区域也在向流(scope.isActive returns 真)
  2. 我不知道如何处理 throw exception if all regions failed to connect or after 30 second timeout
  3. 的竞争条件

此外,我对 kotlin Flow 和协程还很陌生,所以我不知道是否真的有必要在这里创建一个流程

您不需要创建 CoroutineScope 并在协程中管理它。您可以改用 coroutineScope 函数。

我当然没有测试以下任何内容,所以请原谅语法错误并省略 <types> 编译器无法推断。

下面是使用 select 子句的方法,但我认为这有点尴尬:

override suspend fun getClosestRegion(): Region = coroutineScope {
    val regions = regionsRepository.getRegions()
    val result = select<Region?> {
        onTimeout(30.seconds) { null }
        for (region in regions) {
            launch {
                val retrofitClient = buildRetrofitClient(region.backendUrl)
                val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                val result = regionAuthenticationAPI.canConnect()
                if (!it.isSuccessful) {
                    delay(30.seconds) // prevent this one from being selected
                }
            }.onJoin { region }
        }
    }
    coroutineContext.cancelChildren() // Cancel any remaining async jobs
    requireNotNull(result) { "Failed to connect to any region" }
}

以下是使用 channelFlow 的方法:

override suspend fun getClosestRegion(): Region = coroutineScope {
    val regions = regionsRepository.getRegions()
    val flow = channelFlow {
        for (region in regions) {
            launch {
                val retrofitClient = buildRetrofitClient(region.backendUrl)
                val regionAuthenticationAPI = retrofitClient.create(AuthenticationAPI::class.java)
                val result = regionAuthenticationAPI.canConnect()
                if (result.isSuccessful) {
                    send(region)
                }
            }
        }
    }
    val result = withTimeoutOrNull(30.seconds) { 
        flow.firstOrNull()
    }
    coroutineContext.cancelChildren() // Cancel any remaining async jobs
    requireNotNull(result) { "Failed to connect to any region" }
}

我认为如果您放弃 isActive 检查并像我上面那样使用 coroutineScope { }cancelChildren(),您的 MutableSharedFlow 技术也可以工作。但是创建一个不被任何东西共享的共享流似乎很尴尬(它只被创建它的同一个协程使用)。

  1. If 1 region was successfully connected, I expect that the of the requests will be cancelled (by scope.cancel()), but in reality other regions that have successfully connected AFTER the first one are also emitting value to the flow (scope.isActive returns true)

引用the documentation...

Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable.

客户端一旦启动就无法取消 - 客户端可以中断其正在执行的操作。这可能不会在 Retrofit 内部发生。

我假设您发送的请求比您需要的多不是问题 - 否则您将无法同时发出请求。


  1. I don't know how to handle the race condition of throw exception if all regions failed to connect or after 30 second timeout

据我了解有三种情况

  1. 有一个响应成功 - 其他响应应被忽略
  2. 所有响应都不成功 - 应该抛出一个错误
  3. 所有响应都超过 30 秒 - 再次抛出错误

此外,我不想跟踪有多少请求 active/failed/successful。这需要共享状态,而且复杂且脆弱。相反,我想使用 parent-child 关系来管理它。

超时

超时已由 withTimeoutOrNull() 处理 - 很简单!

第一次成功

Selects 在这里可能会有用,我看到@Tenfour04 已经提供了这个答案。我给个替代方案。

使用 suspendCancellableCoroutine() 提供了一种方法

  1. return 一旦成功 - resume(...)
  2. 当所有请求都失败时抛出错误 - resumeWithException
suspend fun getClosestRegion(
  regions: List<Region>
): Region = withTimeoutOrNull(10.seconds) {

  // don't give the supervisor a parent, because if one response is successful
  // the parent will be await the cancellation of the other children
  val supervisorJob = SupervisorJob()

  // suspend the current coroutine. We'll use cont to continue when 
  // there's a definite outcome
  suspendCancellableCoroutine<Region> { cont ->

    launch(supervisorJob) {
      regions
        .map { region ->
          // note: use async instead of launch so we can do awaitAll()
          // to track when all tasks have completed, but none have resumed
          async(supervisorJob) {

            coroutineContext.job.invokeOnCompletion {
              log("cancelling async job for $region")
            }

            val retrofitClient = buildRetrofitClient(region)
            val response = retrofitClient.connect()
            
            // if there's a success, then try to complete the supervisor.
            // complete() prevents multiple jobs from continuing the suspended
            // coroutine
            if (response.isSuccess && supervisorJob.complete()) {
              log("got success for $region - resuming")
              // happy flow - we can return
              cont.resume(region)
            }
          }
        }.awaitAll()

      // uh-oh, nothing was a success
      if (supervisorJob.complete()) {
        log("no successful regions - throwing exception & resuming")
        cont.resumeWithException(Exception("no region response was successful"))
      }
    }
  }
} ?: error("Timeout error - unable to get region")

例子

全部回复成功

如果所有任务都成功,那么到return

所需的时间最短
getClosestRegion(
  List(5) {
    Region("attempt1-region$it", success = true)
  }
)

...

log("result for all success: $regionSuccess, time $time")
 got success for Region(name=attempt1-region1, success=true, delay=2s) - resuming
 cancelling async job for Region(name=attempt1-region3, success=true, delay=2s)
 result for all success: Region(name=attempt1-region1, success=true, delay=2s), time 2.131312600s
 cancelling async job for Region(name=attempt1-region1, success=true, delay=2s)

所有响应都失败

当所有响应都失败时,它应该只需要最大超时时间。

getClosestRegion(
  List(5) {
    Region("attempt2-region$it", success = false)
  }
)

...

log("failure: $allFailEx, time $time")
[DefaultDispatcher-worker-6 @all-fail#6] cancelling async job for Region(name=attempt2-region4, success=false, delay=1s)
[DefaultDispatcher-worker-4 @all-fail#4] cancelling async job for Region(name=attempt2-region2, success=false, delay=4s)
[DefaultDispatcher-worker-3 @all-fail#3] cancelling async job for Region(name=attempt2-region1, success=false, delay=4s)
[DefaultDispatcher-worker-6 @all-fail#5] cancelling async job for Region(name=attempt2-region3, success=false, delay=4s)
[DefaultDispatcher-worker-6 @all-fail#2] cancelling async job for Region(name=attempt2-region0, success=false, delay=5s)
[DefaultDispatcher-worker-6 @all-fail#1] no successful regions - throwing exception resuming
[DefaultDispatcher-worker-6 @all-fail#1] failure: java.lang.Exception: no region response was successful, time 5.225431500s

所有响应超时

如果所有响应都超过超时时间(我在示例中将其减少到 10 秒),则会抛出异常。

getClosestRegion(
  List(5) {
    Region("attempt3-region$it", false, 100.seconds)
  }
)

...

log("timeout: $timeoutEx, time $time")
[kotlinx.coroutines.DefaultExecutor] timeout: java.lang.IllegalStateException: Timeout error - unable to get region, time 10.070052700s

完整的演示代码

import kotlin.coroutines.*
import kotlin.random.*
import kotlin.time.Duration.Companion.seconds
import kotlin.time.*
import kotlinx.coroutines.*


suspend fun main() {
  System.getProperties().setProperty("kotlinx.coroutines.debug", "")

  withContext(CoroutineName("all-success")) {
    val (regionSuccess, time) = measureTimedValue {
      getClosestRegion(
        List(5) {
          Region("attempt1-region$it", true)
        }
      )
    }
    log("result for all success: $regionSuccess, time $time")
  }

  log("\n------\n")

  withContext(CoroutineName("all-fail")) {
    val (allFailEx, time) = measureTimedValue {
      try {
        getClosestRegion(
          List(5) {
            Region("attempt2-region$it", false)
          }
        )
      } catch (exception: Exception) {
        exception
      }
    }
    log("failure: $allFailEx, time $time")
  }

  log("\n------\n")

  withContext(CoroutineName("timeout")) {
    val (timeoutEx, time) = measureTimedValue {
      try {
        getClosestRegion(
          List(5) {
            Region("attempt3-region$it", false, 100.seconds)
          }
        )
      } catch (exception: Exception) {
        exception
      }
    }
    log("timeout: $timeoutEx, time $time")
  }
}


suspend fun getClosestRegion(
  regions: List<Region>
): Region = withTimeoutOrNull(10.seconds) {

  val supervisorJob = SupervisorJob()

  suspendCancellableCoroutine<Region> { cont ->

    launch(supervisorJob) {
      regions
        .map { region ->
          async(supervisorJob) {

            coroutineContext.job.invokeOnCompletion {
              log("cancelling async job for $region")
            }

            val retrofitClient = buildRetrofitClient(region)
            val response = retrofitClient.connect()
            if (response.isSuccess && supervisorJob.complete()) {
              log("got success for $region - resuming")
              cont.resume(region)
            }
          }
        }.awaitAll()

      // uh-oh, nothing was a success
      if (supervisorJob.complete()) {
        log("no successful regions - throwing exception resuming")
        cont.resumeWithException(Exception("no region response was successful"))
      }
    }
  }
} ?: error("Timeout error - unable to get region")


////////////////////////////////////////////////////////////////////////////////////////////////////


data class Region(
  val name: String,
  val success: Boolean,
  val delay: Duration = Random(name.hashCode()).nextInt(1..5).seconds,
) {
  val backendUrl = "http://localhost/$name"
}


fun buildRetrofitClient(region: Region) = RetrofitClient(region)


class RetrofitClient(private val region: Region) {

  suspend fun connect(): ClientResponse {
    delay(region.delay)
    return ClientResponse(region.backendUrl, region.success)
  }
}

data class ClientResponse(
  val url: String,
  val isSuccess: Boolean,
)

fun log(msg: String) = println("[${Thread.currentThread().name}] $msg")