使用 kotlin 协程进行长轮询

Long polling with kotlin coroutines

我在 GitHub Long Polling Redis

找到了这个存储库

所以在spring启动时,我们可以使用延迟请求将客户端请求保持几秒钟(AppMsgController.java#L72)

并且它会发送回客户端,直到 延迟请求 填满结果 (AppMsgHandler.java#L74) 或直到超时。

我还注意到这种机制也可以通过 CompetableFuture 在 java 中使用 completeOnTimeout 来实现。

但我想知道我们可以在 Kotlin Coroutines 中使用类似的东西吗?

在 Kotlin 协程中有 Deferred type, which is similar to CompletableFuture in the sense that it represents a value that is not yet available but probably will be in the future (if no error occurs/exception is thrown). @Joffrey pointed out that there is also a CompletableDeferred,它更接近 ComplatableFuture 允许用户手动调用 completeexceptionallyComplete

可以使用 async extension function on CoroutineScope. If you want to set a timeout, Kotlin has you covered with the withTimeout 函数在给定时间后取消代码块来轻松创建延迟。

注意 withTimeout 应该在 async 里面,而不是反过来。

看看这个例子:https://pl.kotl.in/uYe12ds7g

正如@Spitzbueb 所说,您可以使用 CompletableDeferred 做类似的事情。

但是,如果您不需要支持 clear()count() 方法,您也可以通过将 ConcurrentHashMap 替换为简单的 MutableSharedFlow<Unit> 来简化从 redis 广播“ping”。

onMessage 中,您可以将 Unit 发送到可变共享流中以通知订阅者,然后您可以通过等待共享流上的第一个元素并使readSubset 请求:

class AppMsgHandler(@Autowired private val appMsgRepo: AppMsgRepo) : MessageListener {

    private val events = MutableSharedFlow<Unit>()

    suspend fun requestMessages(start: Int, timeoutMillis: Long): List<AppMsg> {
        val currentMsgs = appMsgRepo.readSubset(start)
        if (currentMsgs.isNotEmpty()) {
            return currentMsgs
        }
        val newMessages = withTimeoutOrNull(timeoutMillis) {
            events.first()
            appMsgRepo.readSubset(start)
        }
        return newMessages ?: emptyList()
    }

    override fun onMessage(message: Message, pattern: ByteArray?) {
        LOG.info("RedisPub: {} on Channel: {}", String(message.body, UTF8), String(message.channel, UTF8))
        events.tryEmit(Unit)
    }

    companion object {
        private val LOG: Logger = LoggerFactory.getLogger(AppMsgHandler::class.java)
        private val UTF8: Charset = StandardCharsets.UTF_8
    }
}

控制器然后可以简单地调用 requestMessages(前提是您让控制器使用 suspend 函数和 Spring WebFlux)。