使用 kafka-node 与消费者组手动提交消息

Manual commit messages with consumer group using kafka-node

我想在我的消费者组中完成所有任务(如将消息推送到数据库)后手动提交消息。如何禁用自动提交和手动提交消息。

改用node-rdkafka。它工作得很好。你可以提交像

这样的消息
consumer.commit({ topic: data.topic, partition: data.partition, offset: data.offset + 1 }, function(err, data) {})

设置自动提交为假

const kafka = require('kafka-node');

const config = require('../config');

const client = new kafka.KafkaClient({
  kafkaHost: config.kafkaHost
});

const consumer = new kafka.Consumer(client, [
  {
    topic: config.kafkaTopic
  }
], {
  autoCommit: false
});

然后手动提交-

consumer.on('message', (message) => {
  console.log('message', message);
  // feed data into db 
  consumer.commit((error, data) => {
    if (error) {
      console.error(error);
    } else {
      console.log('Commit success: ', data);
    }
  });
});