Kafka Consumer:如何以编程方式使用 Go Sarama 中的特定偏移量
Kafka Consumer: How to programatically consume from specific offset in Go Sarama
最近开始研究使用kafka。我正在处理的项目使用 sarama.
为了阅读消息,我使用 ConsumerGroup
。
如果foo
returns false
,我需要过段时间再读一遍邮件。如何做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
您可以通过在消费者组的 Setup()
回调中包含以下内容,将消费者组的偏移量重置为旧偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}
您也可以通过控制台实现相同的目的:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--execute
最近开始研究使用kafka。我正在处理的项目使用 sarama.
为了阅读消息,我使用 ConsumerGroup
。
如果foo
returns false
,我需要过段时间再读一遍邮件。如何做到这一点?
func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for message := range claim.Messages() {
if ok := foo(message); ok {
session.MarkMessage(message, "")
} else {
// ???
}
}
return nil
}
您可以通过在消费者组的 Setup()
回调中包含以下内容,将消费者组的偏移量重置为旧偏移量:
func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
sess.ResetOffset(topic, partition, offset, "")
return nil
}
您也可以通过控制台实现相同的目的:
kafka-consumer-groups \
--bootstrap-server localhost:9092 \
--group my-consumer-group \
--topic myTopicName \
--reset-offsets \
--to-offfset 100 \
--execute