原因:org.apache.kafka.common.record.InvalidRecordException:声明的批量大小不正确,文件中仍有记录

Caused by: org.apache.kafka.common.record.InvalidRecordException: Incorrect declared batch size, records still remaining in file

我们在使用来自 kafka-2.2.1 的消息时遇到问题,我们在 kafka 上配置 compression.type = zstd broker.The full exception stack is

org.apache.kafka.common.KafkaException: Received exception when fetching the next record from test-10. If needed, please seek past the record to continue consumption.
  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.fetchRecords(Fetcher.java:1519)
  at org.apache.kafka.clients.consumer.internals.Fetcher$CompletedFetch.access00(Fetcher.java:1374)
  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:676)
  at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:631)
  at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1282)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
  at org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:258)
Caused by: org.apache.kafka.common.InvalidRecordException: Incorrect declared batch size, records still remaining in file

我们终于解决了这个问题。我们检查使用的 jar 版本是 kafka conumser,我们发现 zstd-jni-1.3.2-2.jar 的版本低于 zstd-jni-1.3.8-1.jar 的 kafka broker。 pom.xml中的jar版本冲突,因为我们在pom.xml中依赖spark-core,而这个jar间接依赖zstd-jni-1.3.2-2.jar。因此,只需从 spark-core 中排除 zstd 即可解决 this.So 检查代理和客户端中使用的压缩和解压缩 jar 的版本。 (例如,zstd、lz4、snappy、gzip)

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.binary.version}</artifactId>
            <version>${spark.version}</version>
            <scope>${spark.scope}</scope>
            <exclusions>
                <exclusion>
                    <artifactId>zstd-jni</artifactId>
                    <groupId>com.github.luben</groupId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.2.1</version>
        </dependency>