如何停止接收关于某个主题的 kafka 元数据消息
How to stop receiving kafka-metadata messages on a topic
出于某种原因,有时我们的 Confluent Kafka 消费者会收到关于特定主题的奇怪消息:
{
"magicByte": 2,
"attributes": 0,
"timestamp": "1623227829187",
"offset": "11575814",
"key": null,
"value": {
"type": "Buffer",
"data": []
},
"headers": {},
"isControlRecord": false,
"batchContext": {
"firstOffset": "11575814",
"firstTimestamp": "1623227829187",
"partitionLeaderEpoch": 5,
"inTransaction": false,
"isControlBatch": false,
"lastOffsetDelta": 0,
"producerId": "-1",
"producerEpoch": -1,
"firstSequence": -1,
"maxTimestamp": "1623227829187",
"timestampType": 0,
"magicByte": 2
}
}
我已经多次消费了这条完全相同的消息,即使我吞下了所有错误并在失败后提交了偏移量。
它只发生在一个特定的主题上。我们从 scarth 删除并重新创建了这个主题,它没有解决这个问题。
在下面的截图中,我们看到一条消息是空的,而其他消息不是空的。这是什么意思?
制作人 - C#:
_producerBuilder = new ProducerBuilder<Null, string>(new ProducerConfig
{
BootstrapServers = _kafka.BootstrapServers,
ClientId = Dns.GetHostName(),
LingerMs = _kafka.LingerMsKafka,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = _kafka.SaslUsername,
SaslPassword = _kafka.SaslPassword,
SecurityProtocol = SecurityProtocol.SaslSsl,
EnableSslCertificateVerification = false
}).Build();
消费者 NodeJs:
this.kafka = new Kafka.Kafka({
brokers: [`${connectionOptions.host}:${connectionOptions.port}`],
logLevel: Kafka.logLevel.NOTHING,
ssl: !!connectionOptions.sasl,
sasl: connectionOptions.sasl,
})
// ...
this.consumer = this.kafka.consumer({
groupId: this.groupId,
})
// ...
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true })
await this.consumer.run({ batch }: EachBatchPayload) => {/* ...*/})
我们无法使用 ccloud 使用有问题的消息:
错误 - “恐慌:运行时错误:索引超出范围 [0],长度为 0”
我们很迷茫。任何帮助都会很棒!
NodeJS:v14.15.4
KafkaJS:1.16.0-beta.18
这个问题发生在 kafka confluent (dev) 上。
支持团队没有解决这个问题。
通过移至 https://aiven.io/,此问题再也没有重现。
出于某种原因,有时我们的 Confluent Kafka 消费者会收到关于特定主题的奇怪消息:
{
"magicByte": 2,
"attributes": 0,
"timestamp": "1623227829187",
"offset": "11575814",
"key": null,
"value": {
"type": "Buffer",
"data": []
},
"headers": {},
"isControlRecord": false,
"batchContext": {
"firstOffset": "11575814",
"firstTimestamp": "1623227829187",
"partitionLeaderEpoch": 5,
"inTransaction": false,
"isControlBatch": false,
"lastOffsetDelta": 0,
"producerId": "-1",
"producerEpoch": -1,
"firstSequence": -1,
"maxTimestamp": "1623227829187",
"timestampType": 0,
"magicByte": 2
}
}
我已经多次消费了这条完全相同的消息,即使我吞下了所有错误并在失败后提交了偏移量。
它只发生在一个特定的主题上。我们从 scarth 删除并重新创建了这个主题,它没有解决这个问题。
在下面的截图中,我们看到一条消息是空的,而其他消息不是空的。这是什么意思?
制作人 - C#:
_producerBuilder = new ProducerBuilder<Null, string>(new ProducerConfig
{
BootstrapServers = _kafka.BootstrapServers,
ClientId = Dns.GetHostName(),
LingerMs = _kafka.LingerMsKafka,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = _kafka.SaslUsername,
SaslPassword = _kafka.SaslPassword,
SecurityProtocol = SecurityProtocol.SaslSsl,
EnableSslCertificateVerification = false
}).Build();
消费者 NodeJs:
this.kafka = new Kafka.Kafka({
brokers: [`${connectionOptions.host}:${connectionOptions.port}`],
logLevel: Kafka.logLevel.NOTHING,
ssl: !!connectionOptions.sasl,
sasl: connectionOptions.sasl,
})
// ...
this.consumer = this.kafka.consumer({
groupId: this.groupId,
})
// ...
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true })
await this.consumer.run({ batch }: EachBatchPayload) => {/* ...*/})
我们无法使用 ccloud 使用有问题的消息: 错误 - “恐慌:运行时错误:索引超出范围 [0],长度为 0”
我们很迷茫。任何帮助都会很棒!
NodeJS:v14.15.4 KafkaJS:1.16.0-beta.18
这个问题发生在 kafka confluent (dev) 上。
支持团队没有解决这个问题。
通过移至 https://aiven.io/,此问题再也没有重现。