kafka-node 从最后一个偏移量开始消费
kafka-node start consume from last offset
我正在使用 kafka-node 来消费来自特定 Kafka 主题的消息。当我重新启动我的节点服务器时,它会按预期初始化我的消费者,但它的默认行为是从偏移量 0 开始消费,而我的目标是只接收新消息(也就是从当前偏移量开始消费)。我没有从 API 文档中找到实现该目标的方法。有人知道它是否支持吗?
谢谢!
如果您只想接收新消息,则必须在创建消费者实例之前设置以下内容属性:
auto.offset.reset=最新
我在 kafka-node github 问题 (link) 中问了这个问题并得到了答案。它现在可用(从 v0.4.0 开始)。以下代码段对我有用:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
干杯!
相似的回答;这将检索每个分区的所有偏移量,并将偏移量设置为最大值,负 1,以使用给定主题的最后发布的消息。
var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
if (err) {
console.log(`error fetching latest offsets ${err}`)
return
}
var latest = 1
Object.keys(offsets[topic]).forEach( o => {
latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
})
consumer.setOffset(topic, 0, latest-1)
})
我正在使用 kafka-node 来消费来自特定 Kafka 主题的消息。当我重新启动我的节点服务器时,它会按预期初始化我的消费者,但它的默认行为是从偏移量 0 开始消费,而我的目标是只接收新消息(也就是从当前偏移量开始消费)。我没有从 API 文档中找到实现该目标的方法。有人知道它是否支持吗?
谢谢!
如果您只想接收新消息,则必须在创建消费者实例之前设置以下内容属性: auto.offset.reset=最新
我在 kafka-node github 问题 (link) 中问了这个问题并得到了答案。它现在可用(从 v0.4.0 开始)。以下代码段对我有用:
consumerClient = new kafka.Client('localhost:2181');
/* Print latest offset. */
var offset = new kafka.Offset(consumerClient);
offset.fetch([{ topic: 'myTopic', partition: 0, time: -1 }], function (err, data) {
var latestOffset = data['myTopic']['0'][0];
console.log("Consumer current offset: " + latestOffset);
});
var consumer = new kafka.HighLevelConsumer(
consumerClient,
[
{ topic: 'myTopic', partition: 0, fromOffset: -1 }
],
{
autoCommit: false
}
);
干杯!
相似的回答;这将检索每个分区的所有偏移量,并将偏移量设置为最大值,负 1,以使用给定主题的最后发布的消息。
var offset = new kafka.Offset(client)
offset.fetchLatestOffsets([topic], (err, offsets) => {
if (err) {
console.log(`error fetching latest offsets ${err}`)
return
}
var latest = 1
Object.keys(offsets[topic]).forEach( o => {
latest = offsets[topic][o] > latest ? offsets[topic][o] : latest
})
consumer.setOffset(topic, 0, latest-1)
})