从反应式读取时如何保持 Redis 连接打开 API
How to keep redis connection open when reading from reactive API
我一直在使用 spring 反应式 api(使用 lettuce 驱动程序)监听 redis 流。我正在使用独立连接。似乎反应堆的事件循环每次读取消息时都会打开一个新连接,而不是保持连接打开。当我 运行 我的程序时,我在我的机器上看到很多 TIME_WAIT 端口。这是正常的吗?有没有办法让生菜知道重新使用连接而不是每次都重新连接?
这是我的代码:
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory);
return receiver
.receive(Consumer.from(keyCacheStreamsConfig.getConsumerGroup(), keyCacheStreamsConfig.getConsumer()),
StreamOffset.create(keyCacheStreamsConfig.getStreamName(), ReadOffset.lastConsumed()))//
// flatMap reads 256 messages by default and processes them in the given scheduler
.flatMap(record -> Mono.fromCallable(() -> consumer.consume(record)).subscribeOn(Schedulers.boundedElastic()))//
.doOnError(t -> {
log.error("Error processing.", t);
streamConnections.get(nodeName).setDirty(true);
})//
.onErrorContinue((err, elem) -> log.error("Error processing message. Continue listening."))//
.subscribe();
看起来 spring-data-redis 库仅在流接收器选项中将轮询超时设置为“0”时才重新使用连接,并将其作为第二个参数传递给 StreamReceiver.create(factory, options)
.通过查看 spring-data-redis 的源代码得出结论。
我一直在使用 spring 反应式 api(使用 lettuce 驱动程序)监听 redis 流。我正在使用独立连接。似乎反应堆的事件循环每次读取消息时都会打开一个新连接,而不是保持连接打开。当我 运行 我的程序时,我在我的机器上看到很多 TIME_WAIT 端口。这是正常的吗?有没有办法让生菜知道重新使用连接而不是每次都重新连接?
这是我的代码:
StreamReceiver<String, MapRecord<String, String, String>> receiver = StreamReceiver.create(factory);
return receiver
.receive(Consumer.from(keyCacheStreamsConfig.getConsumerGroup(), keyCacheStreamsConfig.getConsumer()),
StreamOffset.create(keyCacheStreamsConfig.getStreamName(), ReadOffset.lastConsumed()))//
// flatMap reads 256 messages by default and processes them in the given scheduler
.flatMap(record -> Mono.fromCallable(() -> consumer.consume(record)).subscribeOn(Schedulers.boundedElastic()))//
.doOnError(t -> {
log.error("Error processing.", t);
streamConnections.get(nodeName).setDirty(true);
})//
.onErrorContinue((err, elem) -> log.error("Error processing message. Continue listening."))//
.subscribe();
看起来 spring-data-redis 库仅在流接收器选项中将轮询超时设置为“0”时才重新使用连接,并将其作为第二个参数传递给 StreamReceiver.create(factory, options)
.通过查看 spring-data-redis 的源代码得出结论。