使用Flux.cache重用webflux中的Redis频道订阅

Using Flux.cache to reuse Redis channel subscription in webflux

我正在使用 Spring webflux 构建一个端点,该端点将流式传输从 Redis 频道订阅接收的事件。

是这样的:

class MyService(redisTemplate: ReactiveRedisOperations<String, String>) {

    private val redisChannelFlux = redisTemplate
            .listenToChannel("myChannel")
            .map { it.message }
            .cache(0) // transforms this FLux into a reusable Hot publisher

    fun watch() : Flux<String> {
        return redisChannelFlux
    }

}

class MyController(val svc: MyService) {

    @GetMapping("/api/watch", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
    fun watch() : Flux<String> {
        return svc.watch()
    }

}

有效。当客户端订阅 /api/watch 端点时,它开始从 Redis 通道接收新事件,我可以在 Redis 监视器中确认 "SUBSCRIBE" "myChannel" 只发生一次,无论有多少客户端连接到我的反应端点。太棒了!

我只是不确定在这种情况下使用 Flux.cache() 有多安全。我是在和灾难调情吗?是否有推荐的方法将现有发布者与新订阅者一起使用?

Flux.cache() 通常用于向新订阅者重播 Flux 的最后 N 个元素。由于您在此处将该数字设置为 0,看来您只是对共享资源感兴趣,而不是向新订阅者重播最新事件。

考虑到这一点,您可以改用 Flux.share()。一旦第一个订阅者进来,这个操作员就会订阅你的 redis 实例,并与所有其他人共享资源。一旦所有订阅者都消失,与您的 redis 实例的连接将关闭,直到另一个订阅者到来,等等。