反应式 Redis 主题中的已发布消息不会发送到客户端
Published messages in a reactive Redis topic aren't sent to the client
在实现 GraphQL 订阅的过程中,我想出了一个穷人的解决方案 pub/sub,自定义主题如下所示:
// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
new ConcurrentMultiMap<>();
public Publisher<String> subscribeToMessages(final Long id) {
return Flux.create(
newSubscriber ->
subscribers.add(
id,
newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
OverflowStrategy.LATEST);
}
public void publish(final Long id, final String message) {
Optional.of(id)
.map(subscribers::get)
.ifPresent(
subscribers ->
subscribers.forEach(
subscriber -> subscriber.next(message)));
}
继续前进,我想用 RedissonReactiveClient
的 Redis 支持的解决方案替换此实现,因为我偶然发现了 returns 和 Flux
的 RTopicReactive::getMessages
方法。基本上以前的代码现在看起来如下:
public Publisher<String> subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class);
}
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message);
}
不幸的是,这没有按预期工作。据我所知,更新已正确发布(因为我连接到 redis 实例并 SUBSCRIBE
d 到它一次以检查数据)甚至在让我的 GraphQL 客户端订阅服务器后 Mono
publish
返回表示有一个订阅者,我的 GraphQL 客户端。但是,它没有更新其数据,并且浏览器中的网络选项卡没有显示来自服务器的已发布数据的 websocket 消息。因此,我假设通过 subscribeToMessages
方法中的 Flux
公开的数据格式不正确。
尝试使用主题的 getMessages
方法时,有什么需要注意的吗?
您忘记对两个发布者调用订阅。应该是这样的:
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message).doOnSuccess(res -> {
// ...
}).subscribe();
}
public void subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class).doOnNext(res -> {
// ...
}).subscribe();
}
在实现 GraphQL 订阅的过程中,我想出了一个穷人的解决方案 pub/sub,自定义主题如下所示:
// The map is a custom class, just to handle multiple clients subscribed to the same data
// and we just create one FluxSink per client which is internally stored in a Set
private static final ConcurrentMultiMap<Long, FluxSink<String>> subscribers =
new ConcurrentMultiMap<>();
public Publisher<String> subscribeToMessages(final Long id) {
return Flux.create(
newSubscriber ->
subscribers.add(
id,
newSubscriber.onDispose(() -> subscribers.remove(id, newSubscriber))),
OverflowStrategy.LATEST);
}
public void publish(final Long id, final String message) {
Optional.of(id)
.map(subscribers::get)
.ifPresent(
subscribers ->
subscribers.forEach(
subscriber -> subscriber.next(message)));
}
继续前进,我想用 RedissonReactiveClient
的 Redis 支持的解决方案替换此实现,因为我偶然发现了 returns 和 Flux
的 RTopicReactive::getMessages
方法。基本上以前的代码现在看起来如下:
public Publisher<String> subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class);
}
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message);
}
不幸的是,这没有按预期工作。据我所知,更新已正确发布(因为我连接到 redis 实例并 SUBSCRIBE
d 到它一次以检查数据)甚至在让我的 GraphQL 客户端订阅服务器后 Mono
publish
返回表示有一个订阅者,我的 GraphQL 客户端。但是,它没有更新其数据,并且浏览器中的网络选项卡没有显示来自服务器的已发布数据的 websocket 消息。因此,我假设通过 subscribeToMessages
方法中的 Flux
公开的数据格式不正确。
尝试使用主题的 getMessages
方法时,有什么需要注意的吗?
您忘记对两个发布者调用订阅。应该是这样的:
public void publish(final Long id, final String message) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
topic.publish(message).doOnSuccess(res -> {
// ...
}).subscribe();
}
public void subscribeToMessages(final Long id) {
val topic = redissonReactiveClient.getTopic("messages/" + id.toString());
return topic.getMessages(String.class).doOnNext(res -> {
// ...
}).subscribe();
}