在 Kotlin 序列中的嵌套对象中使用 yield

Using yield in nested object in Kotlin sequence

我想通过 Kotlin Sequence.

流式传输 Spring JDBC RowCallbackHandler 捕获的结果对象

代码基本上是这样的:

fun findManyObjects(): Sequence<Thing> = sequence {
    val rowHandler = object : RowCallbackHandler {
        override fun processRow(resultSet: ResultSet) {
            val thing = // create from resultSet
            yield(thing) // ERROR! No coroutine scope
        }
    }
    jdbcTemplate.query("select * from ...", rowHandler)
}
    

但是我得到了编译错误:

Suspension functions can be called only within coroutine body.

然而,正是这个“协程体”应该存在,因为整个块都包裹在一个 sequence 构建器中。但它似乎不适用于嵌套对象。

显示它不使用嵌套对象编译的最小示例:

// compiles
sequence {
    yield(1)
}

// doesn't compile
sequence {
   object {
        fun doit() {
            yield(1) // Suspension functions can be called only within coroutine body.
        }
    }
}

如何将对象从 ResultSet 传递到 Sequence

对异步数据流使用Flow

无法在 RowCallbackHandler 对象中调用 yield 的原因有两个。

  1. processRow 函数不是挂起函数(也不可能,因为它是在 Java 中声明并由 Java 调用的)。像yield这样的挂起函数只能被另一个挂起函数调用。
  2. 序列总是在 sequence { ... } 生成器 return 时结束。即使你我都知道 query 方法会在 returning 之前从序列中调用 RowCallbackHandler,Kotlin 编译器也无法知道这一点。从序列本身以外的函数和对象产生序列值是永远不允许的,因为无法知道它们将在何时何地 运行.

为了解决这个问题,我们需要引入一种不同类型的协程:一种可以在等待 RowCallbackHandler 被调用时自行挂起的协程。

不幸的是,因为我们在这里谈论 JDBC,所以引入 full-blown 协程可能不会有太多好处。在幕后,对数据库的调用将始终以阻塞方式进行,从而消除了很多好处。不尝试 'stream' 结果,而是以无聊的 old-fashioned 方式迭代它们可能会更简单。但是,让我们一起探索各种可能性。

序列问题

序列是为 on-demand 计算而设计的,不是异步的。他们不能等待其他异步操作,例如回调。序列生成器的 yield 函数只是在等待调用者检索下一个项目时挂起,它是序列中唯一允许调用的挂起函数。如果您尝试在序列中使用像 delay 这样的简单挂起调用,则可以证明这一点。你会收到一个编译错误,让你知道你在一个受限的协程范围内操作。

sequence<String> { delay(1000) } // doesn't compile

没有调用挂起函数的能力,就没有办法等待调用回调。认识到这一局限性,Kotlin 为 on-demand 值流提供了一种替代机制,该机制确实以异步方式提供数据。它被称为 Flow.

回调流程

Roman Elizarov 在他的 Medium 文章 Callbacks and Kotlin Flows.

中很好地描述了使用流从回调接口提供值的机制

如果您确实想使用回调流程,只需将 sequence 替换为 callbackFlow,并将 yield 替换为 sendBlocking

您的代码可能如下所示:

fun findManyObjects(): Flow<Thing> = callbackFlow {
    val rowHandler = object : RowCallbackHandler {
        override fun processRow(resultSet: ResultSet) {
            val thing = // create from resultSet
            sendBlocking(thing)
        }
    }
    jdbcTemplate.query("select * from ...", rowHandler)
    close() // the query is finished, so there are no more rows
}

更简单的流程

虽然这是流式传输回调提供的值的惯用方法,但它可能不是解决此问题的最简单方法。通过完全避免回调,您可以使用更常见的 flow 构建器,将每个值传递给它的 emit 函数。但是现在你有了协程形式的异步,你不能只 return 一个流然后允许 Spring 立即关闭结果集。您需要能够延迟结果集的关闭,直到流被实际消耗掉。这意味着剥离 RowCallbackHandlerResultSetExtractor 提供的抽象,它们期望以阻塞方式处理所有结果,而不是提供您自己的实现。

fun Connection.findManyObjects(): Flow<Thing> = flow {
    prepareStatement("select * from ...").use { statement ->
        statement.executeQuery().use { resultSet ->        
            while (resultSet.next()) {
                val thing = // create from resultSet
                emit(thing)
            }
        }
    }
}

注意 use 块,它将处理关闭语句和结果集。因为在 while 循环完成并且所有值都已发出之前我们不会到达 use 块的末尾,所以在结果集保持打开状态时流可以自由挂起。

那么为什么要使用流呢?

您可能会注意到,如果这样做,您 可以 实际上将 flowemit 替换为 sequenceyield。那么我们绕了一圈了吗?好吧,有点。不同之处在于,flow 只能从协程中使用,而使用 sequence,您可以在根本不挂起的情况下迭代结果值。在这种特殊情况下,很难做出调用,因为 JDBC 操作总是阻塞 .

  • 如果您使用序列,调用线程将在等待接收数据时阻塞。序列中的值总是由使用序列的事物计算,因此如果序列调用阻塞函数,消费者的线程将阻塞等待值。在 non-coroutine 应用程序中,这可能没问题,但如果您使用协程,您 确实希望避免在 innocuous-looking 序列中隐藏阻塞调用
  • 如果您使用流程,您至少可以通过在特定调度程序上设置流程 运行 来隔离阻塞调用。例如,您可以使用 built-in IO 调度程序执行 JDBC 调用,然后切换回默认调度程序以进行任何进一步处理。如果您确实想要流值,我认为这是比使用序列更好的方法。

考虑到所有这些,您需要小心如果您确实选择其中一种解决方案,我们将使用协程和调度程序。如果您不想担心这一点,使用常规 ResultSetExtractor 并暂时忘记序列和流程也没有错。