kafka-node 从最后一个偏移量开始消费

kafka-node start consume from last offset

我正在使用 kafka-node 来消费来自特定 Kafka 主题的消息。当我重新启动我的节点服务器时,它会按预期初始化我的消费者,但它的默认行为是从偏移量 0 开始消费,而我的目标是只接收新消息(也就是从当前偏移量开始消费)。我没有从 API 文档中找到实现该目标的方法。有人知道它是否支持吗?

谢谢!

如果您只想接收新消息,则必须在创建消费者实例之前设置以下内容属性: auto.offset.reset=最新

我在 kafka-node github 问题 (link) 中问了这个问题并得到了答案。它现在可用(从 v0.4.0 开始)。以下代码段对我有用:

consumerClient = new kafka.Client('localhost:2181');

/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);

offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
        var latestOffset = data['myTopic']['0'][0];
        console.log("Consumer current offset: " + latestOffset);
});

var consumer = new kafka.HighLevelConsumer(
        consumerClient,
        [
            { topic: 'myTopic', partition: 0, fromOffset: -1 }
        ],
        {
            autoCommit: false
        }
);

干杯!

相似的回答;这将检索每个分区的所有偏移量,并将偏移量设置为最大值,负 1,以使用给定主题的最后发布的消息。

var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
    if (err) {
        console.log(`error fetching latest offsets ${err}`)
        return
    }
    var latest = 1
    Object.keys(offsets[topic]).forEach( o => {
        latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
    })
    consumer.setOffset(topic, 0, latest-1)
})