如何停止接收关于某个主题的 kafka 元数据消息

How to stop receiving kafka-metadata messages on a topic

出于某种原因,有时我们的 Confluent Kafka 消费者会收到关于特定主题的奇怪消息:

{
  "magicByte": 2,
  "attributes": 0,
  "timestamp": "1623227829187",
  "offset": "11575814",
  "key": null,
  "value": {
    "type": "Buffer",
    "data": []
  },
  "headers": {},
  "isControlRecord": false,
  "batchContext": {
    "firstOffset": "11575814",
    "firstTimestamp": "1623227829187",
    "partitionLeaderEpoch": 5,
    "inTransaction": false,
    "isControlBatch": false,
    "lastOffsetDelta": 0,
    "producerId": "-1",
    "producerEpoch": -1,
    "firstSequence": -1,
    "maxTimestamp": "1623227829187",
    "timestampType": 0,
    "magicByte": 2
  }
}

我已经多次消费了这条完全相同的消息,即使我吞下了所有错误并在失败后提交了偏移量。

它只发生在一个特定的主题上。我们从 scarth 删除并重新创建了这个主题,它没有解决这个问题。

在下面的截图中,我们看到一条消息是空的,而其他消息不是空的。这是什么意思?


制作人 - C#:

_producerBuilder = new ProducerBuilder<Null, string>(new ProducerConfig
                {
                    BootstrapServers = _kafka.BootstrapServers,
                    ClientId = Dns.GetHostName(),
                    LingerMs = _kafka.LingerMsKafka,
                    SaslMechanism = SaslMechanism.Plain,
                    SaslUsername = _kafka.SaslUsername,
                    SaslPassword = _kafka.SaslPassword,
                    SecurityProtocol = SecurityProtocol.SaslSsl,
                    EnableSslCertificateVerification = false
                }).Build();

消费者 NodeJs:

this.kafka = new Kafka.Kafka({
  brokers: [`${connectionOptions.host}:${connectionOptions.port}`],
  logLevel: Kafka.logLevel.NOTHING,
  ssl: !!connectionOptions.sasl,
  sasl: connectionOptions.sasl,
})
// ...
this.consumer = this.kafka.consumer({
  groupId: this.groupId,
})
// ...
await this.consumer.connect()
await this.consumer.subscribe({ topic: this.topic, fromBeginning: true })
await this.consumer.run({ batch }: EachBatchPayload) => {/* ...*/})

我们无法使用 ccloud 使用有问题的消息: 错误 - “恐慌:运行时错误:索引超出范围 [0],长度为 0”

我们很迷茫。任何帮助都会很棒!


NodeJS:v14.15.4 KafkaJS:1.16.0-beta.18

这个问题发生在 kafka confluent (dev) 上。

支持团队没有解决这个问题。

通过移至 https://aiven.io/,此问题再也没有重现。