反应式 Redis 主题中的已发布消息不会发送到客户端

Published messages in a reactive Redis topic aren't sent to the client

在实现 GraphQL 订阅的过程中,我想出了一个穷人的解决方案 pub/sub,自定义主题如下所示:

// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
      new ConcurrentMultiMap<>();

public Publisher<String> subscribeToMessages(final Long id) {
  return Flux.create(
      newSubscriber ->
          subscribers.add(
              id,
              newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
      OverflowStrategy.LATEST);
}

public void publish(final Long id, final String message) {
  Optional.of(id)
      .map(subscribers::get)
      .ifPresent(
          subscribers ->
              subscribers.forEach(
                  subscriber -> subscriber.next(message)));
}

继续前进,我想用 RedissonReactiveClient 的 Redis 支持的解决方案替换此实现,因为我偶然发现了 returns 和 FluxRTopicReactive::getMessages 方法。基本上以前的代码现在看起来如下:

public Publisher<String> subscribeToMessages(final Long id) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  return topic.getMessages(String.class);
}

public void publish(final Long id, final String message) {
  val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
  topic.publish(message);
}

不幸的是,这没有按预期工作。据我所知,更新已正确发布(因为我连接到 redis 实例并 SUBSCRIBEd 到它一次以检查数据)甚至在让我的 GraphQL 客户端订阅服务器后 Mono publish 返回表示有一个订阅者,我的 GraphQL 客户端。但是,它没有更新其数据,并且浏览器中的网络选项卡没有显示来自服务器的已发布数据的 websocket 消息。因此,我假设通过 subscribeToMessages 方法中的 Flux 公开的数据格式不正确。

尝试使用主题的 getMessages 方法时,有什么需要注意的吗?

您忘记对两个发布者调用订阅。应该是这样的:

public void publish(final Long id, final String message) {  
   val topic = redissonReactiveClient.getTopic("messages/" + id.toString());  
   topic.publish(message).doOnSuccess(res -> {  

       // ...   

   }).subscribe();
}

public void subscribeToMessages(final Long id) {
   val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
   return topic.getMessages(String.class).doOnNext(res -> {  

       // ...   

   }).subscribe();
}