Karafka start_from_beginning 未按预期工作

Karafka start_from_beginning not working as expected

我创建了一个项目来帮助我理解 Kafka。它被设置为三个相同的 Rails 应用程序,它们都在 Docker 中,Karafka 配置为使用消息 - 如果您在一个中创建记录,它会被复制到其他两个。我假设 start_from_beginning 设置意味着每次重新启动 Karafka 服务器时它都会从偏移量 0 开始,但事实似乎确实如此。谁能解释一下我做错了什么或纠正我的理解。

这是karafka.rb

中的两个重要部分
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://kafka:9092]
    config.client_id = "app_#{ ENV['APP_ID'] }"
    config.logger = Rails.logger
  end

...

  consumer_groups.draw do
    topic :party do
      consumer PartyConsumer
      start_from_beginning true
    end
  end

我已经尝试将 config.kafka.start_from_beginning = true 放入 karafka.rb 的配置部分,但没有成功。

当我在其中一个应用程序中创建记录时,它会同步到其他两个应用程序。这就是我想要做的:

在这一点上,我希望通过倒带到偏移量 0 并重播所有消息来从 Kafka 重新创建数据库。我错过了什么?

完整项目在这里:https://github.com/jcleary/kafka-demo

问题其实出在我对start_from_beginning的理解上,目的是决定消费者的行为方式。现有的消费者总是希望从他们离开的地方继续。

我找到了两种方法来实现我正在寻找的东西:

1 - 使用 Kafka 的搜索功能(推荐):

来自维基:https://github.com/karafka/karafka/wiki/Events-monitoring-and-logging#using-the-connectionlistenerbefore_fetch_loop-for-topic-seeking

2 - 更改 karafka.rb 中的 client_id,例如

class KarafkaApp < Karafka::App
  setup do |config|
    config.kafka.seed_brokers = %w[kafka://kafka:9092]
    config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
    config.logger = Rails.logger
  end
  ... 

使用 SecureRandom.uuid 将确保您的消费者拥有唯一的 client_id 并因此重新处理所有消息。

选项 2 更像是一种 hack,但根据您的用例,可能正是您要寻找的。