等待 KafkaJS 中的领导选举
Waiting for leadership elections in KafkaJS
情况
我正在使用 kafkajs 写入一些动态生成的 kafka 主题。
我发现在注册我的制作人后立即写这些主题会经常导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election
。
完整的错误是:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
代码
这是导致问题的代码:
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
问题
两个问题:
- 在连接到生产者(或发送)时,我应该做些什么特别的事情来确保逻辑阻塞,直到生产者真正准备好将数据发送到 kafka 主题?
- 在向生产者发送数据以确保消息不被丢弃时,我应该做些什么特别的事情吗?
解决方案
Kafkajs 通过 admin client 提供了一个 createTopics
方法,它有一个可选的 waitForLeaders
标志:
admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}
使用它可以解决问题。
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
不幸的是,如果该主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑该错误比中断更多的信息。
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}
编辑:以上设置确实需要正确配置您的 Kafka 实例。领导选举可能永远不会解决,在这种情况下,KafkaJS 仍然会抱怨领导选举!
根据我的经验,这是由于 kafka 代理在没有来自 zookeeper de-registered 的情况下被停止的情况,这意味着 zookeeper 正在等待不再存在的响应。
情况
我正在使用 kafkajs 写入一些动态生成的 kafka 主题。
我发现在注册我的制作人后立即写这些主题会经常导致错误:There is no leader for this topic-partition as we are in the middle of a leadership election
。
完整的错误是:
{"level":"ERROR","timestamp":"2020-08-24T17:48:40.201Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 5)","broker":"localhost:9092","clientId":"tv-kitchen","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":146}
代码
这是导致问题的代码:
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
await producer.connect()
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
问题
两个问题:
- 在连接到生产者(或发送)时,我应该做些什么特别的事情来确保逻辑阻塞,直到生产者真正准备好将数据发送到 kafka 主题?
- 在向生产者发送数据以确保消息不被丢弃时,我应该做些什么特别的事情吗?
解决方案
Kafkajs 通过 admin client 提供了一个 createTopics
方法,它有一个可选的 waitForLeaders
标志:
admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
}
使用它可以解决问题。
import kafka from 'myConfiguredKafkaJs'
const run = async () => {
const producer = kafka.producer()
const admin = kafka.admin()
await admin.connect()
await producer.connect()
await admin.createTopics({
waitForLeaders: true,
topics: [
{ topic: 'myRandomTopicString123' },
],
})
producer.send({
topic: 'myRandomTopicString',
messages: [{
value: 'yolo',
}],
})
}
run()
不幸的是,如果该主题已经存在,这将导致不同的错误,但这是一个单独的问题,我怀疑该错误比中断更多的信息。
{"level":"ERROR","timestamp":"2020-08-24T18:19:48.465Z","logger":"kafkajs","message":"[Connection] Response CreateTopics(key: 19, version: 2)","broker":"localhost:9092","clientId":"tv-kitchen","error":"Topic with this name already exists","correlationId":2,"size":86}
编辑:以上设置确实需要正确配置您的 Kafka 实例。领导选举可能永远不会解决,在这种情况下,KafkaJS 仍然会抱怨领导选举!
根据我的经验,这是由于 kafka 代理在没有来自 zookeeper de-registered 的情况下被停止的情况,这意味着 zookeeper 正在等待不再存在的响应。