如何在使用 Spark 结构化流时更新 Kafka 消费者 max.request.size 配置

How to update Kafka consumer max.request.size config while using Spark structured stream

Kafka 的 Spark readStream 失败并出现以下错误:

org.apache.kafka.common.errors.RecordTooLargeException (The message is 1166569 bytes when serialized which is larger than the maximum request size you have configured with the max.request.size configuration.)

我们如何提高 max.request.size

代码:

val ctxdb = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "ip:port")
  .option("subscribe","topic")
  .option("startingOffsets", "earliest")
  .option(" failOnDataLoss", "false")
  .option("max.request.size", "15728640")

我们已经尝试更新 option("max.partition.fetch.bytes", "15728640") 但没有成功。

您需要在写入流设置中添加kafka前缀:

.option("kafka.max.request.size", "15728640")