使用 Redis 流和 Spring 数据获取待处理消息
Get pending messages with Redis Streams and Spring Data
我在 Spring 引导应用程序中使用 Redis Streams。在调度程序中,我经常想要获取所有待处理的消息并检查它们已经处理了多长时间,并在必要时重新触发它们。
我现在的问题是我可以获取待处理的消息,但我不确定如何获取有效负载。
我的第一个方法是使用 pending
和 range
操作。这里的缺点是 totalDeliveryCount 不会随着 range
增加 - 所以我不能使用范围方法
val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
if (map != null && map.size > 0) {
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
我的第二种方法使用了 pending
和 read
操作。对于读取操作,我可以用当前 ID 指定一个偏移量。问题是我只得到比指定的 ID 更高的 ID。
val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream<String, Any>()
.read(it.consumer, StreamReadOptions.empty().count(1),
StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
所以当我使用 ReadOffset.from('1234-0')
时,我没有收到带有 1234-0
的消息,而是收到该消息之后的所有内容。有没有办法获得准确的消息并遵守 totalDeliveryCount
和 elapsedTimeSinceLastDelivery
统计数据?
我正在使用 spring-data-redis 2.3.1.RELEASE
我现在正在使用以下解决方法,这对大多数情况应该都适用:
return if (id.sequence > 0) {
"${id.timestamp}-${id.sequence - 1}"
} else {
"${id.timestamp - 1}-99999"
}
它依赖于每毫秒插入的消息不超过 99999 条的事实。
我在 Spring 引导应用程序中使用 Redis Streams。在调度程序中,我经常想要获取所有待处理的消息并检查它们已经处理了多长时间,并在必要时重新触发它们。
我现在的问题是我可以获取待处理的消息,但我不确定如何获取有效负载。
我的第一个方法是使用 pending
和 range
操作。这里的缺点是 totalDeliveryCount 不会随着 range
增加 - 所以我不能使用范围方法
val pendingMessages = stringRedisTemplate.opsForStream<String, Any>().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream().range(redisStreamName, Range.just(it.idAsString)) // does not increase totalDeliveryCount !!!
if (map != null && map.size > 0) {
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
我的第二种方法使用了 pending
和 read
操作。对于读取操作,我可以用当前 ID 指定一个偏移量。问题是我只得到比指定的 ID 更高的 ID。
val pendingMessages = stringRedisTemplate.opsForStream().pending(redisStreamName, Consumer.from(redisConsumerGroup, instanceName))
return pendingMessages.filter { pendingMessage ->
if (pendingMessage.totalDeliveryCount < maxDeliveryAttempts && pendingMessage.elapsedTimeSinceLastDelivery > Duration.ofMillis(pendingTimeout.toLong())) {
return@filter true
} else {
...
return@filter false
}
}.map { //map from PendingMessage::class to a MapRecord with the content
val map = stringRedisTemplate.opsForStream<String, Any>()
.read(it.consumer, StreamReadOptions.empty().count(1),
StreamOffset.create(redisStreamName, ReadOffset.from(it.id)))
if (map != null && map.size > 0 && map[0].id.value == it.idAsString) { // map[0].id.value == it.idAsString does not match
return@map map[0]
} else {
return@map null
}
}.filterNotNull().toList()
所以当我使用 ReadOffset.from('1234-0')
时,我没有收到带有 1234-0
的消息,而是收到该消息之后的所有内容。有没有办法获得准确的消息并遵守 totalDeliveryCount
和 elapsedTimeSinceLastDelivery
统计数据?
我正在使用 spring-data-redis 2.3.1.RELEASE
我现在正在使用以下解决方法,这对大多数情况应该都适用:
return if (id.sequence > 0) {
"${id.timestamp}-${id.sequence - 1}"
} else {
"${id.timestamp - 1}-99999"
}
它依赖于每毫秒插入的消息不超过 99999 条的事实。