CoroutineScope - CompletableDeferred 取消

CoroutineScope - CompletableDeferred cancellation

关于这个话题我有两个问题。我将在 android 中将这些与用例 类 一起使用,并且我尝试实现类似于此 https://www.youtube.com/watch?v=Sy6ZdgqrQp0 的架构,但我需要一些答案。

1) 我有一个异步生成器延迟,当我取消作业时, 其他连锁店也取消了。此代码打印 "Call cancelled"。但是我不确定我做的是否正确。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = GlobalScope.launch {
        println(getUser())
    }
    job.cancelAndJoin()
}

suspend fun getUser() = getUserDeferred().await()


suspend fun getUserDeferred() = coroutineScope {

    val request = Request.Builder()
            .url("https://jsonplaceholder.typicode.com/users")
            .build()

    val call = OkHttpClient().newCall(request)

    val deferred = async(Dispatchers.IO) {
        val body = call.execute()
        body.body()?.string() ?: ""
    }

    deferred.invokeOnCompletion {
        if (deferred.isCancelled) {
            println("Call cancelled")
            call.cancel()
        }
    }
    deferred
}

2) 我找不到取消这个的方法。我想在retrofit2调用适配器中使用这个,有没有更好的方法来处理这种情况。

fun main(args: Array<String>) = runBlocking<Unit> {
    val job = GlobalScope.launch {
        println(getUser1())
    }
    job.cancelAndJoin()
}

suspend fun getUser1() = getUser1Deferred().await()


fun getUser1Deferred(): Deferred<String> {
    val request = Request.Builder()
            .url("https://jsonplaceholder.typicode.com/users")
            .build()

    val call = OkHttpClient().newCall(request)

    val deferred = CompletableDeferred<String>()

    call.enqueue(object : Callback {

        override fun onFailure(call: Call, e: IOException) {
            deferred.complete("Error")
        }

        override fun onResponse(call: Call, response: Response) {
            deferred.complete(response.body()?.string() ?: "Error")
        }

    })

    deferred.invokeOnCompletion {
        if (deferred.isCancelled) {
            println("Call cancelled")
            call.cancel()
        }
    }
    return deferred
}

您应该避免使用第一种方法,因为它会阻塞线程池中的线程。使用第二种方法,您可以双向传播取消。如果您取消 Deferred 它将取消调用,如果调用失败,它将取消 Deferred 并得到异常。

fun getUserAsync(): Deferred<String> {
    val call = OkHttpClient().newCall(Request.Builder()
            .url("https://jsonplaceholder.typicode.com/users")
            .build())
    val deferred = CompletableDeferred<String>().apply {
        invokeOnCompletion {
            if (isCancelled) {
                call.cancel()
            }
        }
    }
    call.enqueue(object : Callback {
        override fun onResponse(call: Call, response: Response) {
            deferred.complete(response.body()?.string() ?: "Error")
        }
        override fun onFailure(call: Call, e: IOException) {
            deferred.cancel(e)
        }

    })
    return deferred
}

但是,走 Deferred 路线可能是转移注意力。如果你要取消它,根本原因是你要从你正在做的整个任务中解脱出来。相反,您应该取消它运行的整个协程。如果您正确实施 structured concurrency,如果您的 activity 被销毁,一切都会自动发生。

所以我的建议是使用此代码:

suspend fun getUser() = suspendCancellableCoroutine<String> { cont ->
    val call = OkHttpClient().newCall(Request.Builder()
            .url("https://jsonplaceholder.typicode.com/users")
            .build())
    cont.invokeOnCancellation {
        call.cancel()
    }
    call.enqueue(object : Callback {
        override fun onResponse(call: Call, response: Response) {
            cont.resume(response.body()?.string() ?: "Error")
        }
        override fun onFailure(call: Call, e: IOException) {
            cont.resumeWithException(e)
        }

    })
}

如果你绝对需要 Deferred 因为你 运行 它同时在后台运行,使用上面的方法很容易做到:

val userDeferred = this.async { getUser() }

我假设 this 是你的 activity,它也是 CoroutineScope

第二种情况未取消的原因是因为您正在使用 CompletableDeferred。它不是作为协程启动的,因此不是您父协程的子级。因此,如果您取消父级,它不会取消延期。

它在第一种情况下有效,因为 async 启动了一个新的子协程,该子协程 linked 到父协程。当您取消其中一个时,它们都会被取消。

为了 link 将 Deferred 传给你的父作业,你需要引用它并使用 invokeOnCompletion

var deferred : Deferred<Void>? = null
launch {        
   deferred = retroService.someDeferredCall()
   deferred.await()
}.invokeOnCompletion {
   //job was cancelled.  Probably activity closing.
   if(it is CancellationException) {
      deferred?.let { it.cancel() }
   }
}

不是很漂亮,但应该完成工作。