如何设置 @KafkaListener 以在启动时始终重置?

How do I set up a @KafkaListener to always reset on startup?

我有一个 kafka 主题,它接收像

这样的消息
Kafka key: serverId
value: server host name

我只需要每个 serverId 的最新版本,因此它的段大小非常小并且启用了日志压缩。所以我做的是以下

@KafkaListener(topics = SERVERS_KAFKA_TOPIC, id = "#{T(java.util.UUID).randomUUID().toString()}",
  properties = {
    "spring.json.key.default.type=java.lang.String",
    "spring.json.value.default.type=java.lang.String"
  })
  public void registerServer(
    @Payload(required = false) String serverHostName
    ) {

基本上我为每个侦听器创建一个新的消费者组,因为我假设如果它是同一个消费者组,它将在最后一个停止的地方继续,并且副本集的状态不会是什么期待中

当然,这有点让消费者群体列表变得非常烦人。我想知道是否要创建另一个共享服务(如 Redis)来存储数据,因为我无法真正使用 Redis 进行存储,因为我从消息构建的数据是 GRPC 客户端,我可以将侦听器配置为始终从头开始并忽略组中的其他消费者?

使您的侦听器 bean 扩展 AbstractConsumerSeekAware 并在 onPartitionsAssigned 中调用 seekToBeginning()

    /**
     * Seek all assigned partitions to the beginning.
     * @since 2.6
     */
    public void seekToBeginning() {

https://docs.spring.io/spring-kafka/docs/current/reference/html/

How to implement ConsumerSeekAware in Spring-kafka?

Is there any way to get the eldest available offset for a kafka topic

AbstractConsumerSeekAware提供的seekToBeginning()方法我试的时候没有从头找。但是,此代码确实有效

/**
 * Reset to the beginning
 *
 * <p>{@inheritDoc}
 */
@Override
public void onPartitionsAssigned(
        @NotNull Map<TopicPartition, Long> assignments,
        @NotNull ConsumerSeekCallback callback) {

    assignments.keySet().stream()
            .filter(topicPartition -> SERVERS_KAFKA_TOPIC.equals(topicPartition.topic()))
            .forEach(
                    topicPartition ->
                            callback.seekToBeginning(
                                    topicPartition.topic(), topicPartition.partition()));
}

我还需要它用于特定主题而不是所有主题,因为我发现的方法也将它限制为特定主题。