如何设置状态存储更改日志主题的 max.message.bytes?
How can you set the max.message.bytes of a state store changelog topic?
我有一个消息高达 10MiB 的 Kafka Streams 应用程序。我想将这些消息保存在状态存储中,但 Kafka Streams 无法生成内部变更日志主题:
2017-11-17 08:36:19,792 ERROR RecordCollectorImpl - task [4_5] Error sending record to topic appid-statestorename-state-store-changelog. No more offsets will be recorded for this task and the exception will eventually be thrown
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2017-11-17 08:36:20,583 ERROR StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 4_5 due to flush state:
通过添加一些日志记录,看起来内部主题的默认 max.message.bytes
设置是 1MiB。
群集的默认 max.message.bytes
设置为 50MiB。
是否可以调整 Kafka Streams 应用程序的内部主题配置?
解决方法是启动流应用程序,让它创建主题,然后更改主题配置。但这感觉像是一个肮脏的黑客。
./kafka-topics.sh --zookeeper ... \
--alter --topic appid-statestorename-state-store-changelog \
--config max.message.bytes=10485760
Kafka 1.0
允许通过 StreamsConfig
.
为内部主题指定自定义主题属性
您可以使用 "topic."
作为这些配置的前缀,并且可以使用 TopicConfig
中定义的任何配置。
有关更多详细信息,请参阅原始 KIP:
我有一个消息高达 10MiB 的 Kafka Streams 应用程序。我想将这些消息保存在状态存储中,但 Kafka Streams 无法生成内部变更日志主题:
2017-11-17 08:36:19,792 ERROR RecordCollectorImpl - task [4_5] Error sending record to topic appid-statestorename-state-store-changelog. No more offsets will be recorded for this task and the exception will eventually be thrown
org.apache.kafka.common.errors.RecordTooLargeException: The request included a message larger than the max message size the server will accept.
2017-11-17 08:36:20,583 ERROR StreamThread - stream-thread [StreamThread-1] Failed while executing StreamTask 4_5 due to flush state:
通过添加一些日志记录,看起来内部主题的默认 max.message.bytes
设置是 1MiB。
群集的默认 max.message.bytes
设置为 50MiB。
是否可以调整 Kafka Streams 应用程序的内部主题配置?
解决方法是启动流应用程序,让它创建主题,然后更改主题配置。但这感觉像是一个肮脏的黑客。
./kafka-topics.sh --zookeeper ... \
--alter --topic appid-statestorename-state-store-changelog \
--config max.message.bytes=10485760
Kafka 1.0
允许通过 StreamsConfig
.
您可以使用 "topic."
作为这些配置的前缀,并且可以使用 TopicConfig
中定义的任何配置。
有关更多详细信息,请参阅原始 KIP: