延迟 Kotlin 的 buildSequence 的推荐方法是什么?
What's the recommended way to delay Kotlin's buildSequence?
我正在尝试轮询分页 API 并在新项目出现时向用户提供新项目。
fun connect(): Sequence<T> = buildSequence {
while (true) {
// result is a List<T>
val result = dataSource.getFirstPage()
yieldAll(/* the new data in `result` */)
// Block the thread for a little bit
}
}
这是示例用法:
for (item in connect()) {
// do something as each item is made available
}
我的第一个想法是使用 delay
函数,但我收到以下消息:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope
这是buildSequence
的签名:
public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>
我认为此消息意味着我只能在 SequenceBuilder 中使用 suspend
函数:yield
和 yieldAll
并且不能使用任意 suspend
函数调用允许。
现在,我正在使用它在每次 API 被轮询后将序列构建延迟一秒:
val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
// do nothing
}
这可行,但它看起来确实不是一个好的解决方案。以前有人遇到过这个问题吗?
为什么不起作用?一些研究
当我们查看 buildSequence
时,我们可以看到它以 builderAction: suspend SequenceBuilder<T>.() -> Unit
作为参数。作为该方法的客户端,您将能够提交一个 suspend
lambda,它以 SequenceBuilder
作为其接收者(阅读关于接收者 的 lambda)。
SequenceBuilder
本身用 RestrictSuspension
:
注释
@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...
注解是这样定义和注释的:
/**
* Classes and interfaces marked with this annotation are restricted
* when used as receivers for extension `suspend` functions.
* These `suspend` extensions can only invoke other member or extension
* `suspend` functions on this particular receiver only
* and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
正如 RestrictSuspension
文档所述,在 buildSequence
的情况下,您可以传递一个以 SequenceBuilder
作为接收者但 受限 [=55] 的 lambda =] 的可能性,因为您只能调用 "other member or extension suspend
functions on this particular receiver"。这意味着,传递给 buildSequence
的块可以调用在 SequenceBuilder
上定义的任何方法(如 yield
、yieldAll
)。另一方面,因为块是 "restricted from calling arbitrary suspension functions",所以使用 delay
不起作用。生成的编译器错误验证了它:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope.
最后,您需要注意 buildSequence
创建的协程是 同步 协程的一个示例。在您的示例中,序列代码将在通过调用 connect()
.
使用序列的同一线程中执行
如何延迟序列?
据我们了解,buildSequence
创建了一个 同步 序列。在这里使用常规线程阻塞就可以了:
fun connect(): Sequence<T> = buildSequence {
while (true) {
val result = dataSource.getFirstPage()
yieldAll(result)
Thread.sleep(1000)
}
}
但是,您真的要阻塞整个线程吗?或者,您可以按照 here 所述实现异步序列。因此,使用 delay
和其他挂起函数将有效。
我正在尝试轮询分页 API 并在新项目出现时向用户提供新项目。
fun connect(): Sequence<T> = buildSequence {
while (true) {
// result is a List<T>
val result = dataSource.getFirstPage()
yieldAll(/* the new data in `result` */)
// Block the thread for a little bit
}
}
这是示例用法:
for (item in connect()) {
// do something as each item is made available
}
我的第一个想法是使用 delay
函数,但我收到以下消息:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope
这是buildSequence
的签名:
public fun <T> buildSequence(builderAction: suspend SequenceBuilder<T>.() -> Unit): Sequence<T>
我认为此消息意味着我只能在 SequenceBuilder 中使用 suspend
函数:yield
和 yieldAll
并且不能使用任意 suspend
函数调用允许。
现在,我正在使用它在每次 API 被轮询后将序列构建延迟一秒:
val resumeTime = System.nanoTime() + TimeUnit.SECONDS.toNanos(1)
while (resumeTime > System.nanoTime()) {
// do nothing
}
这可行,但它看起来确实不是一个好的解决方案。以前有人遇到过这个问题吗?
为什么不起作用?一些研究
当我们查看 buildSequence
时,我们可以看到它以 builderAction: suspend SequenceBuilder<T>.() -> Unit
作为参数。作为该方法的客户端,您将能够提交一个 suspend
lambda,它以 SequenceBuilder
作为其接收者(阅读关于接收者
SequenceBuilder
本身用 RestrictSuspension
:
@RestrictsSuspension
@SinceKotlin("1.1")
public abstract class SequenceBuilder<in T> ...
注解是这样定义和注释的:
/**
* Classes and interfaces marked with this annotation are restricted
* when used as receivers for extension `suspend` functions.
* These `suspend` extensions can only invoke other member or extension
* `suspend` functions on this particular receiver only
* and are restricted from calling arbitrary suspension functions.
*/
@SinceKotlin("1.1") @Target(AnnotationTarget.CLASS) @Retention(AnnotationRetention.BINARY)
public annotation class RestrictsSuspension
正如 RestrictSuspension
文档所述,在 buildSequence
的情况下,您可以传递一个以 SequenceBuilder
作为接收者但 受限 [=55] 的 lambda =] 的可能性,因为您只能调用 "other member or extension suspend
functions on this particular receiver"。这意味着,传递给 buildSequence
的块可以调用在 SequenceBuilder
上定义的任何方法(如 yield
、yieldAll
)。另一方面,因为块是 "restricted from calling arbitrary suspension functions",所以使用 delay
不起作用。生成的编译器错误验证了它:
Restricted suspended functions can only invoke member or extension suspending functions on their restricted coroutine scope.
最后,您需要注意 buildSequence
创建的协程是 同步 协程的一个示例。在您的示例中,序列代码将在通过调用 connect()
.
如何延迟序列?
据我们了解,buildSequence
创建了一个 同步 序列。在这里使用常规线程阻塞就可以了:
fun connect(): Sequence<T> = buildSequence {
while (true) {
val result = dataSource.getFirstPage()
yieldAll(result)
Thread.sleep(1000)
}
}
但是,您真的要阻塞整个线程吗?或者,您可以按照 here 所述实现异步序列。因此,使用 delay
和其他挂起函数将有效。