使用 buffer() 的 Kotlin 流程异常处理

Kotlin flow exception handling with buffer()

我有一个在第二个元素上抛出异常的简单流程:

private val log = LoggerFactory.getLogger("test")

fun main() = runBlocking {
    flowOf(1, 2, 3)
        .onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
        .buffer()
        .map { throwingMapper(it) }
        .catch { log.error("Exception thrown") }
        .collect { log.info(it) }
    }
}

fun throwingMapper(it: Int): String {
    if (it == 2) {
        throw Exception("Test exception")
    }
    return "$it-mapped"
}

当我执行此代码时,我得到以下输出 - 流程已完成 无一例外

2021-04-09 12:35:00.875 [main] INFO  - test:31 - Flow completed
2021-04-09 12:35:00.904 [main] INFO  - test:133 - 1-mapped
2021-04-09 12:35:00.915 [main] ERROR - test:34 - Exception thrown

但是,当我将 map 运算符移动到 buffer 之前时:

flowOf(1, 2, 3)
        .onCompletion { log.info("Flow completed${if (it == null) "" else " exceptionally: ${it.message}"}") }
        .map { throwingMapper(it) }
        .buffer()
        .catch { log.error("Exception thrown") }
        .collect { log.info(it) }

生成以下输出并且流程完成但例外情况

2021-04-09 12:38:35.982 [main] INFO  - test:31 - Flow completed exceptionally: Test exception
2021-04-09 12:38:36.024 [main] ERROR - test:34 - Exception thrown

为什么第一种情况下流程无异常完成? buffer 是否默默吞下下游异常?或者它是否在内部创建了一个新的流程?如果有,是不是有一些是保留原来的例外?

作为此方法的 documentation

The buffer operator creates a separate coroutine during execution for the flow it applies to. [...] Code before buffer will be executed in a separate new coroutine [...] concurrently with [coroutine that calls this code].

第一种情况下流程正常完成,因为抛出异常的方法和捕获它的方法都放在buffer的一侧,所以它们在同一个协程中执行。

A channel is used between the coroutines to send elements emitted by the coroutine P to the coroutine Q.

我认为不可能通过 Channel 发送异常,因此您可能需要定义两个 catch 处理程序 - 在 buffer 之前和之后(如果您期望异常在两个部分都被抛出)。