Consumer 是否应该处理消息,然后将消息发送回 Kafka
Should Consumer process messages, then send messages back to Kafka
我想使用主题 1 处理来自消费者的数据,然后将消息发送回 Kafka 到主题 2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
我的尝试:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
});
但是,Producer 无法将处理后的消息发送到 Kafka。我得到的错误
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit
我使用节点模块Kafka-node
您需要调换生产者就绪监听器和消费者消息监听器的顺序。
否则,您要为每条消费的消息设置现成的侦听器
例如
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
});
不过,如果主要是处理和转发到新主题,我建议看看这个库 https://github.com/nodefluent/kafka-streams/
我想使用主题 1 处理来自消费者的数据,然后将消息发送回 Kafka 到主题 2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
我的尝试:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
});
但是,Producer 无法将处理后的消息发送到 Kafka。我得到的错误
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit
我使用节点模块Kafka-node
您需要调换生产者就绪监听器和消费者消息监听器的顺序。
否则,您要为每条消费的消息设置现成的侦听器
例如
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
});
不过,如果主要是处理和转发到新主题,我建议看看这个库 https://github.com/nodefluent/kafka-streams/