FlinkKafkaConsumer 无法读取 LZ4 压缩主题

FlinkKafkaConsumer fails to read from a LZ4 compressed topic

我们有几个从 Kafka 主题读取的 flink 应用程序,它们运行良好。但最近我们向现有的 flink 作业添加了一个新主题,它在启动时立即开始失败,并出现以下根错误:

Caused by: org.apache.kafka.common.KafkaException: java.lang.NoClassDefFoundError: net/jpountz/lz4/LZ4Exception
    at org.apache.kafka.common.record.CompressionType.wrapForInput(CompressionType.java:113)
    at org.apache.kafka.common.record.DefaultRecordBatch.compressedIterator(DefaultRecordBatch.java:256)
    at org.apache.kafka.common.record.DefaultRecordBatch.streamingIterator(DefaultRecordBatch.java:334)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.nextFetchedRecord(Fetcher.java:1208)
    at org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1245)
    ... 7 more

我发现这个主题有 lz4 压缩,我猜 flink 由于某种原因无法使用它。将 lz4 依赖项直接添加到应用程序没有工作,而且很奇怪 - 它在本地运行良好,但在远程集群上失败。

flink 运行时版本是 1.9.1,我们的应用程序中所有其他依赖项的版本相同: flink-streaming-java_2.11, flink-connector-kafka_2.11, flink-javaflink-clients_2.11

这会不会是因为 flink 不依赖内部的 lz4 库?

找到解决方案。不需要版本升级,也不需要对应用程序本身的额外依赖。对我们有用的是将 lz4 库 jar 直接添加到 Docker 映像中的 flink libs 文件夹中。之后lz4压缩的错误就消失了