如何使用kafka-node从主题中读取数据?
How to read data from topic using kafka-node?
我有一个主题,我必须从 kafka 服务器读取,因此我只需要创建可以从 kafka 主题读取数据的消费者,我总是得到错误主题不存在。
1- 我怎样才能确保建立了 kafka 连接?
2-如何从kafka中的特定主题获取数据?
main.js
var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
groupId: 'kafka-node-group',
// Auto commit config
autoCommit: true,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);
consumer.on('message', function (message) {
console.log('TEST',this.id, message);
});
错误
events.js:141
throw er; // Unhandled 'error' event
^
TopicsNotExistError: The topic(s) [object Object] do not exist
at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
我正在做一个类似的项目,我在自己的服务器上有一个 Kafka 生产者,并使用 Kafka-Node 作为我的应用程序的消费者。我是 Kafka-Node 的新手,没有太多经验,但我可以尝试分享一些我发现的见解。
我认为您的问题实际上是您的主题不存在。
1。如何确定 Kafka 连接已建立?
如果您的连接未建立,我认为它不会继续说该主题不存在。当我输入一个不存在的主题并为我的 Kafka 生产者输入一个随机 ip 时,没有任何错误。但是当我指向正确的 ip,并且仍然有不正确的主题时,我得到了与您看到的相同的错误。
2。此代码适用于我的应用程序
var kafka = require('kafka-node');
var Consumer = kafka.Consumer,
// The client specifies the ip of the Kafka producer and uses
// the zookeeper port 2181
client = new kafka.Client("<ip to producer>:2181"),
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false });
consumer.on('message', function (message) {
// grab the main content from the Kafka message
var data = JSON.parse(message.value);
console.log(data);
});
希望这不会太晚找到你。
如果出于 debugging/development 目的需要它,则只需添加以下导入(以下代码为 ES6 格式),它应该 console.log 在建立连接时或如果有是否有任何失败消息:
this.kafkaLogging = require('kafka-node/logging');
this.kafkaLogging.setLoggerProvider(this.getLoggerProvider);
...
getLoggerProvider() {
return {
debug: console.log.bind(console),
info : console.log.bind(console),
warn : console.log.bind(console),
error: console.log.bind(console)
};
}
我有一个主题,我必须从 kafka 服务器读取,因此我只需要创建可以从 kafka 主题读取数据的消费者,我总是得到错误主题不存在。
1- 我怎样才能确保建立了 kafka 连接?
2-如何从kafka中的特定主题获取数据?
main.js
var kafka = require('kafka-node');
var config = require('./config.js');
var kafkaConn = config.kafkaCon.dit;
var HighLevelConsumer = kafka.HighLevelConsumer;
//var HighLevelProducer = kafka.HighLevelProducer;
var Client = kafka.Client;
var Offset = kafka.Offset;
var topics = [{topic: 'UEQ'}];
var client = new Client(kafkaConn);
var payloads = [ { topic: topics, partition : 0}];
var options = {
groupId: 'kafka-node-group',
// Auto commit config
autoCommit: true,
autoCommitMsgCount: 100,
autoCommitIntervalMs: 5000,
// Fetch message config
fetchMaxWaitMs: 100,
fetchMinBytes: 1,
fetchMaxBytes: 1024 * 10,
};
var consumer = new HighLevelConsumer(client, payloads, options);
consumer.on('message', function (message) {
console.log('TEST',this.id, message);
});
错误
events.js:141
throw er; // Unhandled 'error' event
^
TopicsNotExistError: The topic(s) [object Object] do not exist
at new TopicsNotExistError (C:\uilogging\node_modules\kafka-node\lib\errors\
TopicsNotExistError.js:11:11)
我正在做一个类似的项目,我在自己的服务器上有一个 Kafka 生产者,并使用 Kafka-Node 作为我的应用程序的消费者。我是 Kafka-Node 的新手,没有太多经验,但我可以尝试分享一些我发现的见解。
我认为您的问题实际上是您的主题不存在。
1。如何确定 Kafka 连接已建立?
如果您的连接未建立,我认为它不会继续说该主题不存在。当我输入一个不存在的主题并为我的 Kafka 生产者输入一个随机 ip 时,没有任何错误。但是当我指向正确的 ip,并且仍然有不正确的主题时,我得到了与您看到的相同的错误。
2。此代码适用于我的应用程序
var kafka = require('kafka-node');
var Consumer = kafka.Consumer,
// The client specifies the ip of the Kafka producer and uses
// the zookeeper port 2181
client = new kafka.Client("<ip to producer>:2181"),
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
client, [ { topic: 'myTopic', partition: 0 } ], { autoCommit: false });
consumer.on('message', function (message) {
// grab the main content from the Kafka message
var data = JSON.parse(message.value);
console.log(data);
});
希望这不会太晚找到你。
如果出于 debugging/development 目的需要它,则只需添加以下导入(以下代码为 ES6 格式),它应该 console.log 在建立连接时或如果有是否有任何失败消息:
this.kafkaLogging = require('kafka-node/logging');
this.kafkaLogging.setLoggerProvider(this.getLoggerProvider);
...
getLoggerProvider() {
return {
debug: console.log.bind(console),
info : console.log.bind(console),
warn : console.log.bind(console),
error: console.log.bind(console)
};
}