redis spring 数据中是否支持 XClaim / claim - ReactiveRedisOperations.opsForStream()
Is XClaim / claim supported in redis spring data - ReactiveRedisOperations.opsForStream()
为了使用 redis 流构建可靠的消息队列,我正在使用 spring-boot-starter-data-redis-reactive 和 lettuce 依赖项来处理来自 redis 流的消息。虽然我可以通过 ReactiveRedisOperations.opsForStream()
中以消费者组形式提供的 api 添加、读取、确认和删除消息,但我找不到 api 来声明未决消息尽管在 this.reactiveRedisConnectionFactory
.getReactiveConnection()
.streamCommands()
.xClaim()
下可用,但 5 分钟内未确认。但我不想使用样板代码来管理异常、序列化等。有没有办法使用 ReactiveRedisOperations.opsForStream()
来声明消息
没有 spring 数据 redis,直接使用 lettuce 客户端库我能够获取待处理的消息以及如下声明消息
public Flux<PendingMessage> getPendingMessages(PollMessage pollMessage, String queueName) {
Predicate<PendingMessage> poisonMessage = pendingMessage -> (pendingMessage.getTotalDeliveryCount()<=maxRetries);
Predicate<PendingMessage> nackMessage = pendingMessage -> (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMillis(ackTimeout)) > 0 );
return statefulRedisClusterConnection.reactive()
.xpending(queueName, pollMessage.getConsumerGroupName(), Range.unbounded(), Limit.from(1000))
.collectList()
.map((it) -> ((PendingMessages)PENDING_MESSAGES_CONVERTER
.apply(it, pollMessage.getConsumerGroupName()))
.withinRange(org.springframework.data.domain.Range.unbounded()))
.flatMapMany(Flux::fromIterable)
.filter(nackMessage)
.filter(poisonMessage)
.limitRequest(pollMessage.getBatchSize());
}
为了声明消息,我再次使用了 lettuce 库中可用的 api
public Flux<StreamMessage<String, String>> claimMessage(PendingMessage pendingMessage, String queueName, String groupName, String serviceName) {
return statefulRedisClusterConnection.reactive()
.xclaim(queueName, Consumer.from(groupName, serviceName), 0, pendingMessage.getIdAsString());
}
目前,通过 spring-data 从 redis 获取待处理消息有问题,因此我直接使用 lettuce 库来获取待处理消息并声明它。
为了使用 redis 流构建可靠的消息队列,我正在使用 spring-boot-starter-data-redis-reactive 和 lettuce 依赖项来处理来自 redis 流的消息。虽然我可以通过 ReactiveRedisOperations.opsForStream()
中以消费者组形式提供的 api 添加、读取、确认和删除消息,但我找不到 api 来声明未决消息尽管在 this.reactiveRedisConnectionFactory
.getReactiveConnection()
.streamCommands()
.xClaim()
下可用,但 5 分钟内未确认。但我不想使用样板代码来管理异常、序列化等。有没有办法使用 ReactiveRedisOperations.opsForStream()
没有 spring 数据 redis,直接使用 lettuce 客户端库我能够获取待处理的消息以及如下声明消息
public Flux<PendingMessage> getPendingMessages(PollMessage pollMessage, String queueName) {
Predicate<PendingMessage> poisonMessage = pendingMessage -> (pendingMessage.getTotalDeliveryCount()<=maxRetries);
Predicate<PendingMessage> nackMessage = pendingMessage -> (pendingMessage.getElapsedTimeSinceLastDelivery().compareTo(Duration.ofMillis(ackTimeout)) > 0 );
return statefulRedisClusterConnection.reactive()
.xpending(queueName, pollMessage.getConsumerGroupName(), Range.unbounded(), Limit.from(1000))
.collectList()
.map((it) -> ((PendingMessages)PENDING_MESSAGES_CONVERTER
.apply(it, pollMessage.getConsumerGroupName()))
.withinRange(org.springframework.data.domain.Range.unbounded()))
.flatMapMany(Flux::fromIterable)
.filter(nackMessage)
.filter(poisonMessage)
.limitRequest(pollMessage.getBatchSize());
}
为了声明消息,我再次使用了 lettuce 库中可用的 api
public Flux<StreamMessage<String, String>> claimMessage(PendingMessage pendingMessage, String queueName, String groupName, String serviceName) {
return statefulRedisClusterConnection.reactive()
.xclaim(queueName, Consumer.from(groupName, serviceName), 0, pendingMessage.getIdAsString());
}
目前,通过 spring-data 从 redis 获取待处理消息有问题,因此我直接使用 lettuce 库来获取待处理消息并声明它。