如何在 Android 上使用协程取消有中断和无中断,包括根据生命周期自动取消?
How to cancel with and without interruption using Coroutines on Android, including auto-cancellation according to lifecycle?
背景
我在从 Android
上的简单(已弃用)AsyncTask
和 Executors
迁移到 Kotlin Coroutines
时遇到问题
问题
我找不到如何使用 Kotlin Coroutines
.
执行我在 AsyncTask
甚至 Executors
上可以完成的基本操作
过去,我可以选择在线程中断和不中断的情况下取消任务。现在出于某种原因,鉴于我在协程上创建的任务,它只是没有中断,这意味着如果我 运行 一些代码甚至在其中“睡眠”(并不总是由我),它不会被打断。
我还记得有人告诉我协程在 Android 上非常好,因为如果您在 Activity 中,它会自动取消所有任务。不过,我找不到如何操作的解释。
我已经尝试并发现了什么
对于 Coroutines 任务(据我所知称为 Deferred
)我想我已经读过,当我创建它时,我必须选择它支持的取消,并且出于某种原因我不能同时拥有它们。不确定这是不是真的,但我仍然想知道,因为我想同时拥有两者以获得最佳迁移。使用 AsyncTask,我过去常常将它们添加到一个集合中(并在取消时删除),以便在 Activity 完成后,我可以遍历所有并取消它们。我什至做了一个很好的 class 来为我做这件事。
这是我创建来测试的:
class MainActivity : AppCompatActivity() {
val uiScope = CoroutineScope(Dispatchers.Main)
val bgDispatcher: CoroutineDispatcher = Dispatchers.IO
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
loadData()
}
private fun loadData(): Job = uiScope.launch {
Log.d("AppLog", "loadData")
val task = async(bgDispatcher) {
Log.d("AppLog", "bg start")
try {
Thread.sleep(5000L) //this could be any executing of code, including things not editable
} catch (e: Exception) {
Log.d("AppLog", "$e")
}
Log.d("AppLog", "bg done this.isActive?${this.isActive}")
return@async 123
}
//simulation of cancellation for any reason, sadly without the ability to cancel with interruption
Handler(mainLooper).postDelayed({
task.cancel()
}, 2000L)
val result: Int = task.await()
Log.d("AppLog", "got result:$result") // this is called even if you change orientation, which I might not want when done in Activity
}
}
构建gradle文件:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"
问题
- 协程是否可以有一个任务,我可以在有和没有线程中断的情况下取消?
- 应该添加什么来使它工作,以便当 Activity 结束时(例如方向改变),它会自动取消(选择有或没有中断)?我想我可以使用与 AsyncTask 类似的解决方案,但我记得有人告诉我,对于协程也有一种很好的方法。
默认情况下,协程不执行线程中断 - 根据 Making computation code cancellable documentation, using yield()
或检查 isActive
允许协程感知代码参与取消。
但是,当您 想要线程中断与阻塞代码交互时,这正是 runInterruptible() 的用例,这将导致包含的代码当协程范围被取消时线程被中断。
这与 lifecycle-aware coroutine scopes 完美配合,后者在 Lifecycle
被销毁时自动取消:
class MainActivity : AppCompatActivity() {
val bgDispatcher: CoroutineDispatcher = Dispatchers.IO
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
loadData()
}
private fun loadData(): Job = lifecycleScope.launch {
Log.d("AppLog", "loadData")
val result = runInterruptible(bgDispatcher) {
Log.d("AppLog", "bg start")
try {
Thread.sleep(5000L) //this could be any executing of code, including things not editable
} catch (e: Exception) {
Log.d("AppLog", "$e")
}
Log.d("AppLog", "bg done this.isActive?${this.isActive}")
return@runInterruptible 123
}
Log.d("AppLog", "got result:$result")
}
}
协同程序并不神奇。它们是使用状态机实现的,并且可能有许多挂起点。这个在原文里都有说明Coroutines KEEP.
取消发生在那些暂停点。协同程序不能在挂起点以外的任何其他点取消(至少在正常执行中)。如果您使用 Thread.sleep
,则没有暂停点。您应该使用 delay
而不是 sleep
,这会引入一个挂点。如果你在做一个长操作,你可以添加几个yield()
来添加挂起点并使你的协程可取消。
来自 the docs:
Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled
调用 suspend
函数会自动引入挂起点。
正如@CommonsWare 所指出的,很少有理由创建协程范围。在活动和片段中,或任何与生命周期相关的组件中,您应该使用 lifecycleScope
。在 ViewModels 中,有 viewModelScope
.
编辑:
我已经尝试将 runInterruptible
的源调整为 在特定条件下不 中断:传递自定义异常的实例 class InterruptionException
作为取消原因将跳过线程中断。我已经用 AtomicInteger
替换了 atomicfu 结构,我假设你的目标只是 JVM。您需要通过添加 -Xopt-in=kotlinx.coroutines.InternalCoroutinesApi
编译器标志来选择加入内部协程 API。
suspend fun <T> runInterruptibleCancellable(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) {
try {
val threadState = ThreadState(coroutineContext.job)
threadState.setup()
try {
block()
} finally {
threadState.clearInterrupt()
}
} catch (e: InterruptedException) {
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
}
}
private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3
private class ThreadState(private val job: Job) : CompletionHandler {
/*
=== States ===
WORKING: running normally
FINISH: complete normally
INTERRUPTING: canceled, going to interrupt this thread
INTERRUPTED: this thread is interrupted
=== Possible Transitions ===
+----------------+ register job +-------------------------+
| WORKING | cancellation listener | WORKING |
| (thread, null) | -------------------------> | (thread, cancel handle) |
+----------------+ +-------------------------+
| | |
| cancel cancel | | complete
| | |
V | |
+---------------+ | |
| INTERRUPTING | <--------------------------------------+ |
+---------------+ |
| |
| interrupt |
| |
V V
+---------------+ +-------------------------+
| INTERRUPTED | | FINISHED |
+---------------+ +-------------------------+
*/
private val _state = AtomicInteger(WORKING)
private val targetThread = Thread.currentThread()
// Registered cancellation handler
private var cancelHandle: DisposableHandle? = null
fun setup() {
cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// Either we successfully stored it or it was immediately cancelled
while (true) {
when (val state = _state.get()) {
// Happy-path, move forward
WORKING -> if (_state.compareAndSet(state, WORKING)) return
// Immediately cancelled, just continue
INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
fun clearInterrupt() {
/*
* Do not allow to untriggered interrupt to leak
*/
while (true) {
when (val state = _state.get()) {
WORKING -> if (_state.compareAndSet(state, FINISHED)) {
cancelHandle?.dispose()
return
}
INTERRUPTING -> {
/*
* Spin, cancellation mechanism is interrupting our thread right now
* and we have to wait it and then clear interrupt status
*/
}
INTERRUPTED -> {
// Clear it and bail out
Thread.interrupted()
return
}
else -> invalidState(state)
}
}
}
// Cancellation handler
override fun invoke(cause: Throwable?) {
if (cause is InterruptionException) {
while (true) {
when (val state = _state.get()) {
// Working -> try to transite state and interrupt the thread
WORKING -> {
if (_state.compareAndSet(state, INTERRUPTING)) {
targetThread.interrupt()
_state.set(INTERRUPTED)
return
}
}
// Finished -- runInterruptible is already complete, INTERRUPTING - ignore
FINISHED, INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
}
private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}
class InterruptionException(cause: Throwable?) : CancellationException() {
init {
initCause(cause)
}
}
fun Job.interrupt(cause: Throwable? = null) {
this.cancel(InterruptionException(cause))
}
suspend fun Job.interruptAndJoin() {
interrupt()
return join()
}
可以使用interrupt
和interruptAndJoin
扩展函数来触发线程中断,否则使用cancel
进行非中断取消。一个例子:
val scope = CoroutineScope(Dispatchers.IO)
val job = scope.launch {
runInterruptibleCancellable {
// some blocking code
Thread.sleep(1000)
if (!isActive) {
println("cancelled")
} else {
println("completed")
}
}
}
job.invokeOnCompletion {
if (it is InterruptionException) {
print("interrupted")
}
}
runBlocking {
// job.interruptAndJoin() // prints "interrupted"
// job.cancelAndJoin() // prints "cancelled"
job.join() // prints "completed"
}
这个例子是我所做的唯一测试。 似乎 有效。我不知道它是否泄漏,我不知道它是否是线程安全的。老实说,我真的远远超出了我的专业知识。请不要在没有进一步确认它有效的情况下在生产中使用它。
背景
我在从 Android
上的简单(已弃用)AsyncTask
和 Executors
迁移到 Kotlin Coroutines
时遇到问题
问题
我找不到如何使用 Kotlin Coroutines
.
AsyncTask
甚至 Executors
上可以完成的基本操作
过去,我可以选择在线程中断和不中断的情况下取消任务。现在出于某种原因,鉴于我在协程上创建的任务,它只是没有中断,这意味着如果我 运行 一些代码甚至在其中“睡眠”(并不总是由我),它不会被打断。
我还记得有人告诉我协程在 Android 上非常好,因为如果您在 Activity 中,它会自动取消所有任务。不过,我找不到如何操作的解释。
我已经尝试并发现了什么
对于 Coroutines 任务(据我所知称为 Deferred
)我想我已经读过,当我创建它时,我必须选择它支持的取消,并且出于某种原因我不能同时拥有它们。不确定这是不是真的,但我仍然想知道,因为我想同时拥有两者以获得最佳迁移。使用 AsyncTask,我过去常常将它们添加到一个集合中(并在取消时删除),以便在 Activity 完成后,我可以遍历所有并取消它们。我什至做了一个很好的 class 来为我做这件事。
这是我创建来测试的:
class MainActivity : AppCompatActivity() {
val uiScope = CoroutineScope(Dispatchers.Main)
val bgDispatcher: CoroutineDispatcher = Dispatchers.IO
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
loadData()
}
private fun loadData(): Job = uiScope.launch {
Log.d("AppLog", "loadData")
val task = async(bgDispatcher) {
Log.d("AppLog", "bg start")
try {
Thread.sleep(5000L) //this could be any executing of code, including things not editable
} catch (e: Exception) {
Log.d("AppLog", "$e")
}
Log.d("AppLog", "bg done this.isActive?${this.isActive}")
return@async 123
}
//simulation of cancellation for any reason, sadly without the ability to cancel with interruption
Handler(mainLooper).postDelayed({
task.cancel()
}, 2000L)
val result: Int = task.await()
Log.d("AppLog", "got result:$result") // this is called even if you change orientation, which I might not want when done in Activity
}
}
构建gradle文件:
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.1"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.4.1"
问题
- 协程是否可以有一个任务,我可以在有和没有线程中断的情况下取消?
- 应该添加什么来使它工作,以便当 Activity 结束时(例如方向改变),它会自动取消(选择有或没有中断)?我想我可以使用与 AsyncTask 类似的解决方案,但我记得有人告诉我,对于协程也有一种很好的方法。
默认情况下,协程不执行线程中断 - 根据 Making computation code cancellable documentation, using yield()
或检查 isActive
允许协程感知代码参与取消。
但是,当您 想要线程中断与阻塞代码交互时,这正是 runInterruptible() 的用例,这将导致包含的代码当协程范围被取消时线程被中断。
这与 lifecycle-aware coroutine scopes 完美配合,后者在 Lifecycle
被销毁时自动取消:
class MainActivity : AppCompatActivity() {
val bgDispatcher: CoroutineDispatcher = Dispatchers.IO
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
loadData()
}
private fun loadData(): Job = lifecycleScope.launch {
Log.d("AppLog", "loadData")
val result = runInterruptible(bgDispatcher) {
Log.d("AppLog", "bg start")
try {
Thread.sleep(5000L) //this could be any executing of code, including things not editable
} catch (e: Exception) {
Log.d("AppLog", "$e")
}
Log.d("AppLog", "bg done this.isActive?${this.isActive}")
return@runInterruptible 123
}
Log.d("AppLog", "got result:$result")
}
}
协同程序并不神奇。它们是使用状态机实现的,并且可能有许多挂起点。这个在原文里都有说明Coroutines KEEP.
取消发生在那些暂停点。协同程序不能在挂起点以外的任何其他点取消(至少在正常执行中)。如果您使用 Thread.sleep
,则没有暂停点。您应该使用 delay
而不是 sleep
,这会引入一个挂点。如果你在做一个长操作,你可以添加几个yield()
来添加挂起点并使你的协程可取消。
来自 the docs:
Coroutine cancellation is cooperative. A coroutine code has to cooperate to be cancellable. All the suspending functions in kotlinx.coroutines are cancellable. They check for cancellation of coroutine and throw CancellationException when cancelled. However, if a coroutine is working in a computation and does not check for cancellation, then it cannot be cancelled
调用 suspend
函数会自动引入挂起点。
正如@CommonsWare 所指出的,很少有理由创建协程范围。在活动和片段中,或任何与生命周期相关的组件中,您应该使用 lifecycleScope
。在 ViewModels 中,有 viewModelScope
.
编辑:
我已经尝试将 runInterruptible
的源调整为 在特定条件下不 中断:传递自定义异常的实例 class InterruptionException
作为取消原因将跳过线程中断。我已经用 AtomicInteger
替换了 atomicfu 结构,我假设你的目标只是 JVM。您需要通过添加 -Xopt-in=kotlinx.coroutines.InternalCoroutinesApi
编译器标志来选择加入内部协程 API。
suspend fun <T> runInterruptibleCancellable(
context: CoroutineContext = EmptyCoroutineContext,
block: () -> T
): T = withContext(context) {
try {
val threadState = ThreadState(coroutineContext.job)
threadState.setup()
try {
block()
} finally {
threadState.clearInterrupt()
}
} catch (e: InterruptedException) {
throw CancellationException("Blocking call was interrupted due to parent cancellation").initCause(e)
}
}
private const val WORKING = 0
private const val FINISHED = 1
private const val INTERRUPTING = 2
private const val INTERRUPTED = 3
private class ThreadState(private val job: Job) : CompletionHandler {
/*
=== States ===
WORKING: running normally
FINISH: complete normally
INTERRUPTING: canceled, going to interrupt this thread
INTERRUPTED: this thread is interrupted
=== Possible Transitions ===
+----------------+ register job +-------------------------+
| WORKING | cancellation listener | WORKING |
| (thread, null) | -------------------------> | (thread, cancel handle) |
+----------------+ +-------------------------+
| | |
| cancel cancel | | complete
| | |
V | |
+---------------+ | |
| INTERRUPTING | <--------------------------------------+ |
+---------------+ |
| |
| interrupt |
| |
V V
+---------------+ +-------------------------+
| INTERRUPTED | | FINISHED |
+---------------+ +-------------------------+
*/
private val _state = AtomicInteger(WORKING)
private val targetThread = Thread.currentThread()
// Registered cancellation handler
private var cancelHandle: DisposableHandle? = null
fun setup() {
cancelHandle = job.invokeOnCompletion(onCancelling = true, invokeImmediately = true, handler = this)
// Either we successfully stored it or it was immediately cancelled
while (true) {
when (val state = _state.get()) {
// Happy-path, move forward
WORKING -> if (_state.compareAndSet(state, WORKING)) return
// Immediately cancelled, just continue
INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
fun clearInterrupt() {
/*
* Do not allow to untriggered interrupt to leak
*/
while (true) {
when (val state = _state.get()) {
WORKING -> if (_state.compareAndSet(state, FINISHED)) {
cancelHandle?.dispose()
return
}
INTERRUPTING -> {
/*
* Spin, cancellation mechanism is interrupting our thread right now
* and we have to wait it and then clear interrupt status
*/
}
INTERRUPTED -> {
// Clear it and bail out
Thread.interrupted()
return
}
else -> invalidState(state)
}
}
}
// Cancellation handler
override fun invoke(cause: Throwable?) {
if (cause is InterruptionException) {
while (true) {
when (val state = _state.get()) {
// Working -> try to transite state and interrupt the thread
WORKING -> {
if (_state.compareAndSet(state, INTERRUPTING)) {
targetThread.interrupt()
_state.set(INTERRUPTED)
return
}
}
// Finished -- runInterruptible is already complete, INTERRUPTING - ignore
FINISHED, INTERRUPTING, INTERRUPTED -> return
else -> invalidState(state)
}
}
}
}
private fun invalidState(state: Int): Nothing = error("Illegal state $state")
}
class InterruptionException(cause: Throwable?) : CancellationException() {
init {
initCause(cause)
}
}
fun Job.interrupt(cause: Throwable? = null) {
this.cancel(InterruptionException(cause))
}
suspend fun Job.interruptAndJoin() {
interrupt()
return join()
}
可以使用interrupt
和interruptAndJoin
扩展函数来触发线程中断,否则使用cancel
进行非中断取消。一个例子:
val scope = CoroutineScope(Dispatchers.IO)
val job = scope.launch {
runInterruptibleCancellable {
// some blocking code
Thread.sleep(1000)
if (!isActive) {
println("cancelled")
} else {
println("completed")
}
}
}
job.invokeOnCompletion {
if (it is InterruptionException) {
print("interrupted")
}
}
runBlocking {
// job.interruptAndJoin() // prints "interrupted"
// job.cancelAndJoin() // prints "cancelled"
job.join() // prints "completed"
}
这个例子是我所做的唯一测试。 似乎 有效。我不知道它是否泄漏,我不知道它是否是线程安全的。老实说,我真的远远超出了我的专业知识。请不要在没有进一步确认它有效的情况下在生产中使用它。