Kafkajs - `该组正在重新平衡,因此需要重新加入`错误导致消息被多次使用
Kafkajs - `The group is rebalancing, so a rejoin is needed` error causes message to be consumed more than once
我在 Kafkajs 消费者方面有优势,有时我会遇到重新平衡错误:
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining
然后一旦消费者组重新平衡,最后处理的消息将再次处理,因为由于错误没有发生提交。
Kafka消费者初始化代码:
import { Consumer, Kafka } from 'kafkajs';
const kafkaInstance = new Kafka({
clientId: 'some_client_id',
brokers: ['brokers list'],
ssl: true
});
const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: ... some processing function
});
我将 sessionTimeout
& heartbeatInteval
增加到更高的值和不同的组合,但仍然在大量消息负载下,我收到错误。
我在 eachMessage
函数中添加了对 heartbeat
函数的调用,这似乎解决了问题。
但我想知道它是否被视为“良好做法”,或者我可以在消费者方面做些什么来防止此类错误?
我在 eachMessage
函数中添加了对 heartbeat
函数的调用,这似乎解决了问题。
我在 Kafkajs 消费者方面有优势,有时我会遇到重新平衡错误:
The group is rebalancing, so a rejoin is needed
[Connection] Response Heartbeat(key: 12, version: 3)
The group is rebalancing, so a rejoin is needed
[Runner] The group is rebalancing, re-joining
然后一旦消费者组重新平衡,最后处理的消息将再次处理,因为由于错误没有发生提交。
Kafka消费者初始化代码:
import { Consumer, Kafka } from 'kafkajs';
const kafkaInstance = new Kafka({
clientId: 'some_client_id',
brokers: ['brokers list'],
ssl: true
});
const kafkaConsumer = kafkaInstance.consumer({ groupId: 'some_consumer_group_id });
await kafkaConsumer.connect();
await kafkaConsumer.subscribe({ topic: 'some_topic', fromBeginning: true });
await kafkaConsumer.run({
autoCommit: false, // cancel auto commit in order to control committing
eachMessage: ... some processing function
});
我将 sessionTimeout
& heartbeatInteval
增加到更高的值和不同的组合,但仍然在大量消息负载下,我收到错误。
我在 eachMessage
函数中添加了对 heartbeat
函数的调用,这似乎解决了问题。
但我想知道它是否被视为“良好做法”,或者我可以在消费者方面做些什么来防止此类错误?
我在 eachMessage
函数中添加了对 heartbeat
函数的调用,这似乎解决了问题。