Spark 配置 spark.streaming.receiver.maxRate 对 Kafka Beam 管道有影响吗

Does Spark configs spark.streaming.receiver.maxRate has any effect in a Kafka Beam pipeline

我想知道当跑步者是 SparkRunner 时,是否有人对 Beam KafkaIO 组件的速率限制有任何经验。我使用的版本 are:Beam 2.29、Spark 3.2.0 和 Kafka 客户端 2.5.0?

我将 Beam 参数 maxRecordsPerBatch 设置为一个很大的数字,100000000。但即使管道停止 45 分钟,也永远不会达到该值。但是,当出现高于正常水平的大量数据时,Kafka 滞后会增加,直到最终赶上来。在 SparkUI 中,我看到未达到参数 batchIntervalMillis=300000(5 分钟),批处理最多需要 3 分钟。看起来 KafkaIO 在某个时候停止读取,即使延迟非常大。我的 Kafka 参数 --fetchMaxWaitMs=1000 --maxPollRecords=5000 应该可以带很多数据。特别是因为 KafkaIO 每个分区创建一个消费者。在我的系统中有多个主题,总共有 992 个分区,而我的 spark.default.parallelism=600。一些分区的数据很少,而另一些分区的数据很多。主题是按区域划分的,当一个区域出现故障时,数据将通过另一个 region/topic 发送。那就是发生滞后的时候。

spark.streaming.receiver.maxRate 和 spark.streaming.receiver.maxRatePerPartition 加上 spark.streaming.backpressure.enabled 的配置值是否起作用? 就我所见,Beam 似乎通过运算符 KafkaIO 控制了 Kafka 的整个读取。该组件创建自己的消费者,因此消费者的速率只能通过使用包含 fetchMaxWaitMs 和 maxPollRecords 的消费者配置来设置。 如果在 IO 源之后的管道的其余部分中,这些 Spark 参数可能会产生任何影响的唯一方法。但我不确定。

所以我终于弄清楚了这一切是如何运作的。首先,我想声明 Spark 配置值:spark.streaming.receiver.maxRate、spark.streaming.receiver.maxRatePerPartition、spark.streaming.backpressure.enabled 在 Beam 中不起作用,因为它们仅在您使用 Spark 的源运算符时才有效从卡夫卡读。由于 Beam 有自己的运算符 KafkaIO,因此它们不会发挥作用。

因此 Beam 在 class SparkPipelineOptions 中定义了一组参数,这些参数在 SparkRunner 中用于设置从 Kafka 读取。这些参数是:

  @Description("Minimum time to spend on read, for each micro-batch.")
  @Default.Long(200)
  Long getMinReadTimeMillis();

  @Description(
  "A value between 0-1 to describe the percentage of a micro-batch dedicated to reading from UnboundedSource.")
  @Default.Double(0.1)
  Double getReadTimePercentage();

Beam 创建一个 SourceDStream 对象,它将传递给 spark 以用作从 Kafka 读取的源。在这个 class 方法中 boundReadDuration returns 计算两个读数中较大值的结果: proportionalDuration 和 lowerBoundDuration。第一个是通过将 readTimePercentage 乘以 BatchIntervalMillis 计算得出的。第二个只是来自 minReadTimeMillis 的以米尔为单位的值。下面是来自 SourceDStream 的代码。此函数返回的持续时间将用于单独从 Kafka 读取,其余时间将分配给管道中的其他任务。

最后但并非最不重要的是,以下参数还控制在批处理期间处理的记录数 maxRecordsPerBatch。管道不会在单个批次中处理超过这些记录。

  private Duration boundReadDuration(double readTimePercentage, long minReadTimeMillis) {
long batchDurationMillis = ssc().graph().batchDuration().milliseconds();
Duration proportionalDuration =
    new Duration(Math.round(batchDurationMillis * readTimePercentage));
Duration lowerBoundDuration = new Duration(minReadTimeMillis);
Duration readDuration =
    proportionalDuration.isLongerThan(lowerBoundDuration)
        ? proportionalDuration
        : lowerBoundDuration;
LOG.info("Read duration set to: " + readDuration);
return readDuration;

}