Kafka Consumer:如何以编程方式使用 Go Sarama 中的特定偏移量

Kafka Consumer: How to programatically consume from specific offset in Go Sarama

最近开始研究使用kafka。我正在处理的项目使用 sarama.

为了阅读消息,我使用 ConsumerGroup

如果foo returns false,我需要过段时间再读一遍邮件。如何做到这一点?

func (consumer *Consumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {

    for message := range claim.Messages() {

            if ok := foo(message); ok {
                session.MarkMessage(message, "")
            } else {
                // ???
            }

    }

    return nil
}

您可以通过在消费者组的 Setup() 回调中包含以下内容,将消费者组的偏移量重置为旧偏移量:

func (e myConsumerGroup) Setup(sess sarama.ConsumerGroupSession) error {
    sess.ResetOffset(topic, partition, offset, "")

    return nil
}

您也可以通过控制台实现相同的目的:

kafka-consumer-groups \
    --bootstrap-server localhost:9092 \
    --group my-consumer-group \
    --topic myTopicName \
    --reset-offsets \
    --to-offfset 100 \
    --execute