使用 buffer() 的 Kotlin 流程异常处理
Kotlin flow exception handling with buffer()
我有一个在第二个元素上抛出异常的简单流程:
private val log = LoggerFactory.getLogger("test")
fun main() = runBlocking {
flowOf(1, 2, 3)
.onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
.buffer()
.map { throwingMapper(it) }
.catch { log.error("Exception thrown") }
.collect { log.info(it) }
}
}
fun throwingMapper(it: Int): String {
if (it == 2) {
throw Exception("Test exception")
}
return "$it-mapped"
}
当我执行此代码时,我得到以下输出 - 流程已完成 无一例外:
2021-04-09 12:35:00.875 [main] INFO - test:31 - Flow completed
2021-04-09 12:35:00.904 [main] INFO - test:133 - 1-mapped
2021-04-09 12:35:00.915 [main] ERROR - test:34 - Exception thrown
但是,当我将 map
运算符移动到 buffer
之前时:
flowOf(1, 2, 3)
.onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
.map { throwingMapper(it) }
.buffer()
.catch { log.error("Exception thrown") }
.collect { log.info(it) }
生成以下输出并且流程完成但例外情况:
2021-04-09 12:38:35.982 [main] INFO - test:31 - Flow completed exceptionally: Test exception
2021-04-09 12:38:36.024 [main] ERROR - test:34 - Exception thrown
为什么第一种情况下流程无异常完成? buffer
是否默默吞下下游异常?或者它是否在内部创建了一个新的流程?如果有,是不是有一些是保留原来的例外?
作为此方法的 documentation:
The buffer
operator creates a separate coroutine during execution for the flow it applies to.
[...]
Code before buffer
will be executed in a separate new coroutine [...] concurrently with [coroutine that calls this code].
第一种情况下流程正常完成,因为抛出异常的方法和捕获它的方法都放在buffer
的一侧,所以它们在同一个协程中执行。
A channel is used between the coroutines to send elements emitted by the coroutine P to the coroutine Q.
我认为不可能通过 Channel
发送异常,因此您可能需要定义两个 catch
处理程序 - 在 buffer
之前和之后(如果您期望异常在两个部分都被抛出)。
我有一个在第二个元素上抛出异常的简单流程:
private val log = LoggerFactory.getLogger("test")
fun main() = runBlocking {
flowOf(1, 2, 3)
.onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
.buffer()
.map { throwingMapper(it) }
.catch { log.error("Exception thrown") }
.collect { log.info(it) }
}
}
fun throwingMapper(it: Int): String {
if (it == 2) {
throw Exception("Test exception")
}
return "$it-mapped"
}
当我执行此代码时,我得到以下输出 - 流程已完成 无一例外:
2021-04-09 12:35:00.875 [main] INFO - test:31 - Flow completed
2021-04-09 12:35:00.904 [main] INFO - test:133 - 1-mapped
2021-04-09 12:35:00.915 [main] ERROR - test:34 - Exception thrown
但是,当我将 map
运算符移动到 buffer
之前时:
flowOf(1, 2, 3)
.onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
.map { throwingMapper(it) }
.buffer()
.catch { log.error("Exception thrown") }
.collect { log.info(it) }
生成以下输出并且流程完成但例外情况:
2021-04-09 12:38:35.982 [main] INFO - test:31 - Flow completed exceptionally: Test exception
2021-04-09 12:38:36.024 [main] ERROR - test:34 - Exception thrown
为什么第一种情况下流程无异常完成? buffer
是否默默吞下下游异常?或者它是否在内部创建了一个新的流程?如果有,是不是有一些是保留原来的例外?
作为此方法的 documentation:
The
buffer
operator creates a separate coroutine during execution for the flow it applies to. [...] Code beforebuffer
will be executed in a separate new coroutine [...] concurrently with [coroutine that calls this code].
第一种情况下流程正常完成,因为抛出异常的方法和捕获它的方法都放在buffer
的一侧,所以它们在同一个协程中执行。
A channel is used between the coroutines to send elements emitted by the coroutine P to the coroutine Q.
我认为不可能通过 Channel
发送异常,因此您可能需要定义两个 catch
处理程序 - 在 buffer
之前和之后(如果您期望异常在两个部分都被抛出)。