使用 kotlin 协程进行长轮询
Long polling with kotlin coroutines
我在 GitHub Long Polling Redis
找到了这个存储库
所以在spring启动时,我们可以使用延迟请求将客户端请求保持几秒钟(AppMsgController.java#L72)
并且它会发送回客户端,直到 延迟请求 填满结果 (AppMsgHandler.java#L74) 或直到超时。
我还注意到这种机制也可以通过 CompetableFuture 在 java 中使用 completeOnTimeout 来实现。
但我想知道我们可以在 Kotlin Coroutines 中使用类似的东西吗?
在 Kotlin 协程中有 Deferred
type, which is similar to CompletableFuture
in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred
,它更接近 ComplatableFuture
允许用户手动调用 complete
或 exceptionallyComplete
。
可以使用 async
extension function on CoroutineScope
. If you want to set a timeout, Kotlin has you covered with the withTimeout
函数在给定时间后取消代码块来轻松创建延迟。
注意 withTimeout
应该在 async
里面,而不是反过来。
看看这个例子:https://pl.kotl.in/uYe12ds7g
正如@Spitzbueb 所说,您可以使用 CompletableDeferred
做类似的事情。
但是,如果您不需要支持 clear()
和 count()
方法,您也可以通过将 ConcurrentHashMap
替换为简单的 MutableSharedFlow<Unit>
来简化从 redis 广播“ping”。
在 onMessage
中,您可以将 Unit
发送到可变共享流中以通知订阅者,然后您可以通过等待共享流上的第一个元素并使readSubset
请求:
class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
private val events = MutableSharedFlow<Unit>()
suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
val currentMsgs = appMsgRepo.readSubset(start)
if (currentMsgs.isNotEmpty()) {
return currentMsgs
}
val newMessages = withTimeoutOrNull(timeoutMillis) {
events.first()
appMsgRepo.readSubset(start)
}
return newMessages ?: emptyList()
}
override fun onMessage(message: Message, pattern: ByteArray?) {
LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
events.tryEmit(Unit)
}
companion object {
private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
private val UTF8: Charset = StandardCharsets.UTF_8
}
}
控制器然后可以简单地调用 requestMessages
(前提是您让控制器使用 suspend
函数和 Spring WebFlux)。
我在 GitHub Long Polling Redis
找到了这个存储库所以在spring启动时,我们可以使用延迟请求将客户端请求保持几秒钟(AppMsgController.java#L72)
并且它会发送回客户端,直到 延迟请求 填满结果 (AppMsgHandler.java#L74) 或直到超时。
我还注意到这种机制也可以通过 CompetableFuture 在 java 中使用 completeOnTimeout 来实现。
但我想知道我们可以在 Kotlin Coroutines 中使用类似的东西吗?
在 Kotlin 协程中有 Deferred
type, which is similar to CompletableFuture
in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred
,它更接近 ComplatableFuture
允许用户手动调用 complete
或 exceptionallyComplete
。
可以使用 async
extension function on CoroutineScope
. If you want to set a timeout, Kotlin has you covered with the withTimeout
函数在给定时间后取消代码块来轻松创建延迟。
注意 withTimeout
应该在 async
里面,而不是反过来。
看看这个例子:https://pl.kotl.in/uYe12ds7g
正如@Spitzbueb 所说,您可以使用 CompletableDeferred
做类似的事情。
但是,如果您不需要支持 clear()
和 count()
方法,您也可以通过将 ConcurrentHashMap
替换为简单的 MutableSharedFlow<Unit>
来简化从 redis 广播“ping”。
在 onMessage
中,您可以将 Unit
发送到可变共享流中以通知订阅者,然后您可以通过等待共享流上的第一个元素并使readSubset
请求:
class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {
private val events = MutableSharedFlow<Unit>()
suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
val currentMsgs = appMsgRepo.readSubset(start)
if (currentMsgs.isNotEmpty()) {
return currentMsgs
}
val newMessages = withTimeoutOrNull(timeoutMillis) {
events.first()
appMsgRepo.readSubset(start)
}
return newMessages ?: emptyList()
}
override fun onMessage(message: Message, pattern: ByteArray?) {
LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
events.tryEmit(Unit)
}
companion object {
private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
private val UTF8: Charset = StandardCharsets.UTF_8
}
}
控制器然后可以简单地调用 requestMessages
(前提是您让控制器使用 suspend
函数和 Spring WebFlux)。