Karafka start_from_beginning 未按预期工作
Karafka start_from_beginning not working as expected
我创建了一个项目来帮助我理解 Kafka。它被设置为三个相同的 Rails 应用程序,它们都在 Docker 中,Karafka 配置为使用消息 - 如果您在一个中创建记录,它会被复制到其他两个。我假设 start_from_beginning
设置意味着每次重新启动 Karafka 服务器时它都会从偏移量 0 开始,但事实似乎确实如此。谁能解释一下我做错了什么或纠正我的理解。
这是karafka.rb
中的两个重要部分
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }"
config.logger = Rails.logger
end
...
consumer_groups.draw do
topic :party do
consumer PartyConsumer
start_from_beginning true
end
end
我已经尝试将 config.kafka.start_from_beginning = true
放入 karafka.rb
的配置部分,但没有成功。
当我在其中一个应用程序中创建记录时,它会同步到其他两个应用程序。这就是我想要做的:
- 清空应用程序 3 中的数据库
- 重启应用程序 3 中的 karafka 服务器 (
with start_from_beginning = true
)
在这一点上,我希望通过倒带到偏移量 0 并重播所有消息来从 Kafka 重新创建数据库。我错过了什么?
问题其实出在我对start_from_beginning
的理解上,目的是决定新消费者的行为方式。现有的消费者总是希望从他们离开的地方继续。
我找到了两种方法来实现我正在寻找的东西:
1 - 使用 Kafka 的搜索功能(推荐):
2 - 更改 karafka.rb
中的 client_id
,例如
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
config.logger = Rails.logger
end
...
使用 SecureRandom.uuid
将确保您的消费者拥有唯一的 client_id
并因此重新处理所有消息。
选项 2 更像是一种 hack,但根据您的用例,可能正是您要寻找的。
我创建了一个项目来帮助我理解 Kafka。它被设置为三个相同的 Rails 应用程序,它们都在 Docker 中,Karafka 配置为使用消息 - 如果您在一个中创建记录,它会被复制到其他两个。我假设 start_from_beginning
设置意味着每次重新启动 Karafka 服务器时它都会从偏移量 0 开始,但事实似乎确实如此。谁能解释一下我做错了什么或纠正我的理解。
这是karafka.rb
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }"
config.logger = Rails.logger
end
...
consumer_groups.draw do
topic :party do
consumer PartyConsumer
start_from_beginning true
end
end
我已经尝试将 config.kafka.start_from_beginning = true
放入 karafka.rb
的配置部分,但没有成功。
当我在其中一个应用程序中创建记录时,它会同步到其他两个应用程序。这就是我想要做的:
- 清空应用程序 3 中的数据库
- 重启应用程序 3 中的 karafka 服务器 (
with start_from_beginning = true
)
在这一点上,我希望通过倒带到偏移量 0 并重播所有消息来从 Kafka 重新创建数据库。我错过了什么?
问题其实出在我对start_from_beginning
的理解上,目的是决定新消费者的行为方式。现有的消费者总是希望从他们离开的地方继续。
我找到了两种方法来实现我正在寻找的东西:
1 - 使用 Kafka 的搜索功能(推荐):
2 - 更改 karafka.rb
中的 client_id
,例如
class KarafkaApp < Karafka::App
setup do |config|
config.kafka.seed_brokers = %w[kafka://kafka:9092]
config.client_id = "app_#{ ENV['APP_ID'] }-#{ SecureRandom.uuid }"
config.logger = Rails.logger
end
...
使用 SecureRandom.uuid
将确保您的消费者拥有唯一的 client_id
并因此重新处理所有消息。
选项 2 更像是一种 hack,但根据您的用例,可能正是您要寻找的。