如何设置 @KafkaListener 以在启动时始终重置?
How do I set up a @KafkaListener to always reset on startup?
我有一个 kafka 主题,它接收像
这样的消息
Kafka key: serverId
value: server host name
我只需要每个 serverId 的最新版本,因此它的段大小非常小并且启用了日志压缩。所以我做的是以下
@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=java.lang.String"
})
public void registerServer(
@Payload(required = false) String serverHostName
) {
基本上我为每个侦听器创建一个新的消费者组,因为我假设如果它是同一个消费者组,它将在最后一个停止的地方继续,并且副本集的状态不会是什么期待中
当然,这有点让消费者群体列表变得非常烦人。我想知道是否要创建另一个共享服务(如 Redis)来存储数据,因为我无法真正使用 Redis 进行存储,因为我从消息构建的数据是 GRPC 客户端,我可以将侦听器配置为始终从头开始并忽略组中的其他消费者?
使您的侦听器 bean 扩展 AbstractConsumerSeekAware
并在 onPartitionsAssigned
中调用 seekToBeginning()
。
/**
* Seek all assigned partitions to the beginning.
* @since 2.6
*/
public void seekToBeginning() {
https://docs.spring.io/spring-kafka/docs/current/reference/html/
How to implement ConsumerSeekAware in Spring-kafka?
Is there any way to get the eldest available offset for a kafka topic
AbstractConsumerSeekAware
提供的seekToBeginning()
方法我试的时候没有从头找。但是,此代码确实有效
/**
* Reset to the beginning
*
* <p>{@inheritDoc}
*/
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments,
@NotNull ConsumerSeekCallback callback) {
assignments.keySet().stream()
.filter(topicPartition -> SERVERS_KAFKA_TOPIC.equals(topicPartition.topic()))
.forEach(
topicPartition ->
callback.seekToBeginning(
topicPartition.topic(), topicPartition.partition()));
}
我还需要它用于特定主题而不是所有主题,因为我发现的方法也将它限制为特定主题。
我有一个 kafka 主题,它接收像
这样的消息Kafka key: serverId
value: server host name
我只需要每个 serverId 的最新版本,因此它的段大小非常小并且启用了日志压缩。所以我做的是以下
@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
properties = {
"spring.json.key.default.type=java.lang.String",
"spring.json.value.default.type=java.lang.String"
})
public void registerServer(
@Payload(required = false) String serverHostName
) {
基本上我为每个侦听器创建一个新的消费者组,因为我假设如果它是同一个消费者组,它将在最后一个停止的地方继续,并且副本集的状态不会是什么期待中
当然,这有点让消费者群体列表变得非常烦人。我想知道是否要创建另一个共享服务(如 Redis)来存储数据,因为我无法真正使用 Redis 进行存储,因为我从消息构建的数据是 GRPC 客户端,我可以将侦听器配置为始终从头开始并忽略组中的其他消费者?
使您的侦听器 bean 扩展 AbstractConsumerSeekAware
并在 onPartitionsAssigned
中调用 seekToBeginning()
。
/**
* Seek all assigned partitions to the beginning.
* @since 2.6
*/
public void seekToBeginning() {
https://docs.spring.io/spring-kafka/docs/current/reference/html/
How to implement ConsumerSeekAware in Spring-kafka?
Is there any way to get the eldest available offset for a kafka topic
AbstractConsumerSeekAware
提供的seekToBeginning()
方法我试的时候没有从头找。但是,此代码确实有效
/**
* Reset to the beginning
*
* <p>{@inheritDoc}
*/
@Override
public void onPartitionsAssigned(
@NotNull Map<TopicPartition, Long> assignments,
@NotNull ConsumerSeekCallback callback) {
assignments.keySet().stream()
.filter(topicPartition -> SERVERS_KAFKA_TOPIC.equals(topicPartition.topic()))
.forEach(
topicPartition ->
callback.seekToBeginning(
topicPartition.topic(), topicPartition.partition()));
}
我还需要它用于特定主题而不是所有主题,因为我发现的方法也将它限制为特定主题。