如何在node-rdkafka中一条一条读取消息
How to read message one by one in node-rdkafka
我正在使用 node-rdkafka (https://github.com/Blizzard/node-rdkafka) 来使用消息,基本设置工作正常,但每次我将内容推送到队列时它都会触发该函数,而不管之前的方法是否完成。
我希望在上一个函数完成时触发下一个数据单元。
这是我的实现
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
'group.id':'consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
});
let is_pause = false;
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('consumer is consuming data');
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer received the data');
consumer.pause([topic]);
console.log('consumer has pause the consuming');
await processMessage(data);
console.log('consumer is resumed');
consumer.resume([topic]);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
您正在调用 consume()
(不带任何参数),return 会尽快发送消息。
如果想控制消费节奏,可以使用另一种方法consume(size)
,即returns size
Kafka记录。例如 consume(1)
将 return 下一条 Kafka 记录。
我正在使用 node-rdkafka (https://github.com/Blizzard/node-rdkafka) 来使用消息,基本设置工作正常,但每次我将内容推送到队列时它都会触发该函数,而不管之前的方法是否完成。
我希望在上一个函数完成时触发下一个数据单元。
这是我的实现
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
'group.id':'consumer',
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
'enable.auto.commit':false
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
process.exit(-1);
}
});
let is_pause = false;
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
consumer.subscribe([topic]);
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
consumer.consume();
}, 1000);
});
consumer.on('data',async (data)=>{
console.log('consumer is consuming data');
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer received the data');
consumer.pause([topic]);
console.log('consumer has pause the consuming');
await processMessage(data);
console.log('consumer is resumed');
consumer.resume([topic]);
is_pause = false;
} catch(error) {
console.log('data consuming error');
console.log(error);
}
} else {
is_pause = false;
}
}
});
您正在调用 consume()
(不带任何参数),return 会尽快发送消息。
如果想控制消费节奏,可以使用另一种方法consume(size)
,即returns size
Kafka记录。例如 consume(1)
将 return 下一条 Kafka 记录。