如果启用统计信息,消费者将停止 - node-rdkafka
Consumer stops if stats is enabled - node-rdkafka
我正在使用 node-rdkafka
来消费来自 kafka 的消息。
如果设置 statistics.interval.ms
,则消费者进程退出。
代码如下:
const { KafkaConsumer } = require('node-rdkafka')
const consumer = new KafkaConsumer(
{
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'statistics.interval.ms': 5000,
},
{},
)
consumer
.on('ready', () => {
consumer.subscribe(['test1'])
setInterval(() => {
consumer.consume(1)
}, 1000)
})
.on('data', (data) => {
console.log('Message found! Contents below.')
console.log(data.value.toString())
})
.on('error', (e) => console.log(e))
此代码仅使用一条消息,然后退出。我也检查了 process.on
事件,但没有抛出错误。如果我删除 'statistics.interval.ms': 5000
那么一切正常
这是 windows
相关的问题。 Linux
完全可以正常工作。
consumer
.on('ready', () => {
consumer.subscribe(['test1'])
consumer.consume()
})
.on('data', (data) => {
console.log('Message found! Contents below.')
console.log(data.value.toString())
})
.on('error', (e) => console.log(e))
试试这个,让我知道它是否适合你。
我正在使用 node-rdkafka
来消费来自 kafka 的消息。
如果设置 statistics.interval.ms
,则消费者进程退出。
代码如下:
const { KafkaConsumer } = require('node-rdkafka')
const consumer = new KafkaConsumer(
{
'group.id': 'kafka',
'metadata.broker.list': 'localhost:9092',
'statistics.interval.ms': 5000,
},
{},
)
consumer
.on('ready', () => {
consumer.subscribe(['test1'])
setInterval(() => {
consumer.consume(1)
}, 1000)
})
.on('data', (data) => {
console.log('Message found! Contents below.')
console.log(data.value.toString())
})
.on('error', (e) => console.log(e))
此代码仅使用一条消息,然后退出。我也检查了 process.on
事件,但没有抛出错误。如果我删除 'statistics.interval.ms': 5000
那么一切正常
这是 windows
相关的问题。 Linux
完全可以正常工作。
consumer
.on('ready', () => {
consumer.subscribe(['test1'])
consumer.consume()
})
.on('data', (data) => {
console.log('Message found! Contents below.')
console.log(data.value.toString())
})
.on('error', (e) => console.log(e))
试试这个,让我知道它是否适合你。