使用Flux.cache重用webflux中的Redis频道订阅
Using Flux.cache to reuse Redis channel subscription in webflux
我正在使用 Spring webflux 构建一个端点,该端点将流式传输从 Redis 频道订阅接收的事件。
是这样的:
class MyService(redisTemplate: ReactiveRedisOperations<String, String>) {
private val redisChannelFlux = redisTemplate
.listenToChannel("myChannel")
.map { it.message }
.cache(0) // transforms this FLux into a reusable Hot publisher
fun watch() : Flux<String> {
return redisChannelFlux
}
}
class MyController(val svc: MyService) {
@GetMapping("/api/watch", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun watch() : Flux<String> {
return svc.watch()
}
}
有效。当客户端订阅 /api/watch
端点时,它开始从 Redis 通道接收新事件,我可以在 Redis 监视器中确认 "SUBSCRIBE" "myChannel"
只发生一次,无论有多少客户端连接到我的反应端点。太棒了!
我只是不确定在这种情况下使用 Flux.cache()
有多安全。我是在和灾难调情吗?是否有推荐的方法将现有发布者与新订阅者一起使用?
Flux.cache()
通常用于向新订阅者重播 Flux
的最后 N
个元素。由于您在此处将该数字设置为 0
,看来您只是对共享资源感兴趣,而不是向新订阅者重播最新事件。
考虑到这一点,您可以改用 Flux.share()
。一旦第一个订阅者进来,这个操作员就会订阅你的 redis 实例,并与所有其他人共享资源。一旦所有订阅者都消失,与您的 redis 实例的连接将关闭,直到另一个订阅者到来,等等。
我正在使用 Spring webflux 构建一个端点,该端点将流式传输从 Redis 频道订阅接收的事件。
是这样的:
class MyService(redisTemplate: ReactiveRedisOperations<String, String>) {
private val redisChannelFlux = redisTemplate
.listenToChannel("myChannel")
.map { it.message }
.cache(0) // transforms this FLux into a reusable Hot publisher
fun watch() : Flux<String> {
return redisChannelFlux
}
}
class MyController(val svc: MyService) {
@GetMapping("/api/watch", produces = [MediaType.TEXT_EVENT_STREAM_VALUE])
fun watch() : Flux<String> {
return svc.watch()
}
}
有效。当客户端订阅 /api/watch
端点时,它开始从 Redis 通道接收新事件,我可以在 Redis 监视器中确认 "SUBSCRIBE" "myChannel"
只发生一次,无论有多少客户端连接到我的反应端点。太棒了!
我只是不确定在这种情况下使用 Flux.cache()
有多安全。我是在和灾难调情吗?是否有推荐的方法将现有发布者与新订阅者一起使用?
Flux.cache()
通常用于向新订阅者重播 Flux
的最后 N
个元素。由于您在此处将该数字设置为 0
,看来您只是对共享资源感兴趣,而不是向新订阅者重播最新事件。
考虑到这一点,您可以改用 Flux.share()
。一旦第一个订阅者进来,这个操作员就会订阅你的 redis 实例,并与所有其他人共享资源。一旦所有订阅者都消失,与您的 redis 实例的连接将关闭,直到另一个订阅者到来,等等。