卡夫卡消费者在暂停和恢复后不消费消息
Kafka consumer not consuming message after pause and resume
我正在使用这个 node-rdkafka
库来实现具有消费者暂停和恢复方法的节点 kafka 来处理背压。我已经创建了我可以 pause the consumer
和 resume the consumer
的小演示,但问题是在 resume the consumer
之后它停止了消费消息。
这是我的代码。
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const log_divider = '-----------------------------------';
const consumer = new Kafka.KafkaConsumer({
'group.id':'gsuite_consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
console.log("Connected to Kafka broker");
});
consumer.on('disconnected', (args) => {
console.error(`Consumer got disconnected: ${JSON.stringify(args)}`);
});
let max_queue_size = 3;
let current_queue = [];
let is_pause = false;
// register ready handler.
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('************consumer is consuming data***********:'+timeMs());
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer has received the data:'+timeMs());
consumer.pause([topic]);
console.log('consumer has pause the consuming:'+timeMs());
await processMessage(data);
console.log('consumer is resumed:'+timeMs());
consumer.resume([topic]);
console.log(log_divider);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
async function processMessage(data) {
// await print_bulk(data);
await processData(0,data);
}
async function print_bulk(data) {
for(var i=0;i<data.length;i++) {
await processData(i,data[i]);
}
}
/**
* Wait specified number of milliseconds.
* @param ms
*/
async function wait(ms) {
console.log('wait for the 3 sec');
return new Promise((resolve) => setTimeout(resolve, ms));
}
var timeMs = ()=> {
var d = new Date();
var h = addZero(d.getHours(), 2);
var m = addZero(d.getMinutes(), 2);
var s = addZero(d.getSeconds(), 2);
var ms = addZero(d.getMilliseconds(), 3);
return h + ":" + m + ":" + s + ":" + ms;
}
var addZero = (x, n)=> {
while (x.toString().length < n) {
x = "0" + x;
}
return x;
}
async function processData(i,m) {
if (m) {
console.log('processing a data start:'+timeMs());
console.log('Received a message:');
console.log(' message: ' + m.value.toString());
console.log(' key: ' + m.key);
console.log(' size: ' + m.size);
console.log(' topic: ' + m.topic);
console.log(' offset: ' + m.offset);
console.log(' partition: ' + m.partition);
consumer.commitMessage(m);
}
await wait(3000);
console.log('process a data completed:'+timeMs());
// delete current_queue[i];
// console.log('after delting lenght of current queue:'+current_queue.length);
// console.log(log_divider);
return true;
}
任何人都可以帮助我,恢复消费者时我做错了什么?当我启动消费者时,它只收到一条消息,在恢复后它仍然没有再消费任何消息。
我已经弄清楚这个问题了。除了 consumer.pause()
& consumer.resume()
方法,我还需要使用 consumer.assignments()
方法。
所以会这样
consumer.pause(consumer.assignments());
consumer.resume(consumer.assignments());
我正在使用这个 node-rdkafka
库来实现具有消费者暂停和恢复方法的节点 kafka 来处理背压。我已经创建了我可以 pause the consumer
和 resume the consumer
的小演示,但问题是在 resume the consumer
之后它停止了消费消息。
这是我的代码。
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const log_divider = '-----------------------------------';
const consumer = new Kafka.KafkaConsumer({
'group.id':'gsuite_consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
console.log("Connected to Kafka broker");
});
consumer.on('disconnected', (args) => {
console.error(`Consumer got disconnected: ${JSON.stringify(args)}`);
});
let max_queue_size = 3;
let current_queue = [];
let is_pause = false;
// register ready handler.
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('************consumer is consuming data***********:'+timeMs());
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer has received the data:'+timeMs());
consumer.pause([topic]);
console.log('consumer has pause the consuming:'+timeMs());
await processMessage(data);
console.log('consumer is resumed:'+timeMs());
consumer.resume([topic]);
console.log(log_divider);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
async function processMessage(data) {
// await print_bulk(data);
await processData(0,data);
}
async function print_bulk(data) {
for(var i=0;i<data.length;i++) {
await processData(i,data[i]);
}
}
/**
* Wait specified number of milliseconds.
* @param ms
*/
async function wait(ms) {
console.log('wait for the 3 sec');
return new Promise((resolve) => setTimeout(resolve, ms));
}
var timeMs = ()=> {
var d = new Date();
var h = addZero(d.getHours(), 2);
var m = addZero(d.getMinutes(), 2);
var s = addZero(d.getSeconds(), 2);
var ms = addZero(d.getMilliseconds(), 3);
return h + ":" + m + ":" + s + ":" + ms;
}
var addZero = (x, n)=> {
while (x.toString().length < n) {
x = "0" + x;
}
return x;
}
async function processData(i,m) {
if (m) {
console.log('processing a data start:'+timeMs());
console.log('Received a message:');
console.log(' message: ' + m.value.toString());
console.log(' key: ' + m.key);
console.log(' size: ' + m.size);
console.log(' topic: ' + m.topic);
console.log(' offset: ' + m.offset);
console.log(' partition: ' + m.partition);
consumer.commitMessage(m);
}
await wait(3000);
console.log('process a data completed:'+timeMs());
// delete current_queue[i];
// console.log('after delting lenght of current queue:'+current_queue.length);
// console.log(log_divider);
return true;
}
任何人都可以帮助我,恢复消费者时我做错了什么?当我启动消费者时,它只收到一条消息,在恢复后它仍然没有再消费任何消息。
我已经弄清楚这个问题了。除了 consumer.pause()
& consumer.resume()
方法,我还需要使用 consumer.assignments()
方法。
所以会这样
consumer.pause(consumer.assignments());
consumer.resume(consumer.assignments());