激活压缩的 Kafka 消息大小

Kafka message size with activated compression

我对 Kafka 2.6.0 中的消息大小配置有点困惑。但是让我们讲故事:

我们正在使用由 3 个节点组成的 Kafka 集群。到目前为止,消息的标准配置。 “zstd 压缩”已激活。

相关代理配置很简单:

compression.type=zstd

此时生产者配置也很简单:

compression.type=zstd

现在我们要将 8 MB 的消息放入特定主题。此数据的压缩大小仅为 200 kbytes。

如果我将此数据放入主题中,则会发生以下错误:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 13:43:34,500] ERROR Error when sending message to topic XXX with key: null, value: 8722456 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.RecordTooLargeException: The message is 8722544 bytes when serialized which is larger than 1048576, which is the value of the max.request.size configuration.

所以我像这样更改了生产者配置:

compression.type=zstd
max.request.size=10485760

现在生产者接受更大的消息。但是还是不行:

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --broker-list broker < kafka/new\ 2.txt

[2020-11-05 15:10:01,513] ERROR Error when sending message to topic Komsa.Kafka.Test with key: null, value: 8722544 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.

这是另一条错误信息。我不明白为什么会这样。

我认为此消息与“message.max.bytes”属性 有关。但我不明白为什么。这是 属性:

的文档

The largest record batch size allowed by Kafka (after compression if compression is enabled). If this is increased and there are consumers older than 0.10.2, the consumers' fetch size must also be increased so that they can fetch record batches this large. In the latest message format version, records are always grouped into batches for efficiency. In previous message format versions, uncompressed records are not grouped into batches and this limit only applies to a single record in that case.This can be set per topic with the topic level max.message.bytes config.

我认为这意味着该参数与压缩后的消息大小有关,有几千字节。

有人可以帮助我吗?

我们的经验是,如果您像在

中那样在代理级别设置压缩类型
compression.type=zstd

代理将解压缩来自生产者的任何内容,并使用该压缩类型再次压缩数据。即使生产者已经使用了 zstd,也会出现解压和“re-compression”。

因此,您需要将经纪人级别的 compression.type 设置为 producer

我找到了解决方案:

问题是 kafka-console-producer.sh 忽略了生产者配置中的 compression.type。如果我显式调用

sudo /opt/kafka/bin/kafka-console-producer.sh --topic XXX --producer.config /opt/kafka/config/admin-ssl.properties --compression-codec=zstd --broker-list broker < kafka/new\ 2.txt

with compression.codec=zstd 它有效,因为生产者压缩了消息。