如何设置消费者从Golang Kafka 10中的特定偏移量开始
How to set consumer to start from a specific offset in Golang Kafka 10
我的需要是让生产者从崩溃前处理的最后一条消息开始。还好我是只有一个主题,一个分区,一个消费者。
为此,我尝试了 https://github.com/Shopify/sarama,但它似乎还不可用。
我现在正在使用 https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。
我无法检索最后提交的偏移量
我不知道如何制作一个 sarama consumer 从所述偏移量开始。到目前为止我发现的唯一参数是 Config.Producer.Offsets.Initial
.
- 如何检索最后提交的偏移量?
- 如何让消费者从最后一条offset已经提交的消息开始?
OffsetNewest
将使它从最后生成的消息开始,而不是消费者最后处理的消息。
- 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 来做到这一点?
提前致谢
P.S。我使用的是 Kafka 10.0,所以偏移量存储在 kafka 而不是 zookeeper 中。
编辑 1:
部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到我们找到未处理的消息。
如果已经为分区保存了偏移量,sarama-cluster 将从该偏移量恢复使用。 Config.Producer.Offsets.Initial
选项仅在不存在保存的偏移量时使用(第一个 运行 用于消费者组)。
您可以通过在 main()
函数的开头添加以下行来验证这一点:
sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
然后你会在输出中看到如下内容:
cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12
其中的 12 是 Sarama 将从该分区开始的偏移量 (sample/0)。
我的需要是让生产者从崩溃前处理的最后一条消息开始。还好我是只有一个主题,一个分区,一个消费者。
为此,我尝试了 https://github.com/Shopify/sarama,但它似乎还不可用。 我现在正在使用 https://godoc.org/github.com/bsm/sarama-cluster,它允许我提交每个消息偏移量。
我无法检索最后提交的偏移量
我不知道如何制作一个 sarama consumer 从所述偏移量开始。到目前为止我发现的唯一参数是 Config.Producer.Offsets.Initial
.
- 如何检索最后提交的偏移量?
- 如何让消费者从最后一条offset已经提交的消息开始?
OffsetNewest
将使它从最后生成的消息开始,而不是消费者最后处理的消息。 - 是否可以仅使用 Shopify/sarama 而不是 bsm/sarama-cluster 来做到这一点?
提前致谢
P.S。我使用的是 Kafka 10.0,所以偏移量存储在 kafka 而不是 zookeeper 中。
编辑 1: 部分解决方案:获取自 sarama.OffsetOldest 以来的所有消息并跳过所有消息,直到我们找到未处理的消息。
如果已经为分区保存了偏移量,sarama-cluster 将从该偏移量恢复使用。 Config.Producer.Offsets.Initial
选项仅在不存在保存的偏移量时使用(第一个 运行 用于消费者组)。
您可以通过在 main()
函数的开头添加以下行来验证这一点:
sarama.Logger = log.New(os.Stdout, "sarama: ", log.LstdFlags)
然后你会在输出中看到如下内容:
cluster/consumer CID-17db1be4-a162-411c-a106-4d198191176a consume sample/0 from 12
其中的 12 是 Sarama 将从该分区开始的偏移量 (sample/0)。