在使用 Kafka 进行 Spark Streaming 的情况下,spark.streaming.kafka.maxRatePerPartition 与 spark.streaming.backpressure.enabled 有何关系?
How is spark.streaming.kafka.maxRatePerPartition related to spark.streaming.backpressure.enabled incase of spark streaming with Kafka?
我正在尝试在读取配置单元 table 后将数据写入 Kafka 主题,如下所示。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
写入成功后(记录数为29000条),我正在另一个文件中读取与以下相同主题的数据:
read_kafka_data.py:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。
spark.streaming.backpressure.enabled
and spark.streaming.kafka.maxRatePerPartition
启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
官方文档中对上述参数的解释为:
Enables or disables Spark Streaming's internal backpressure mechanism
(since 1.5). This enables the Spark Streaming to control the receiving
rate based on the current batch scheduling delays and processing times
so that the system receives only as fast as the system can process.
Internally, this dynamically sets the maximum receiving rate of
receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
and
spark.streaming.kafka.maxRatePerPartition
现在我是运行第一次申请,没有之前的微批,我是否应该指定一些值:spark.streaming.backpressure.initialRate
如果是,我应该如何确定spark.streaming.backpressure.initialRate
的值。
文档还说如果 spark.streaming.backpressure.enabled
设置为 true
最大接收速率是动态设置的。
如果是这样,我们是否还需要配置:
spark.streaming.receiver.maxRate
和 spark.streaming.kafka.maxRatePerPartition
如果 spark.streaming.backpressure.enabled
设置为 true
?
这 link 表示在施加背压时使用 spark.streaming.backpressure.initialRate
没有影响。
如能帮助消除混淆,我们将不胜感激。
您所指的配置 spark.streaming.[...]
属于 Direct Streaming(又名 Spark Streaming),不 结构化流.
如果您不知道其中的区别,我建议您查看单独的编程指南:
- Structured Streaming: processing structured data streams with relation queries (using Datasets and DataFrames, newer API than DStreams)
- Spark Streaming: processing data streams using DStreams (old API)
Structured Streaming 不提供背压机制。当您从 Kafka 消费时,您可以使用(正如您已经在做的那样)选项 maxOffsetsPerTrigger
来设置每个触发器上读取消息的限制。此选项在 Structured Streaming and Kafka Integration Guide 中记录为:
"Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."
如果你对题目问题还感兴趣
How is spark.streaming.kafka.maxRatePerPartition
related to spark.streaming.backpressure.enabled
in case of spark streaming with Kafka?
此关系在 Spark's Configuration 上的文档中进行了解释:
"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values spark.streaming.receiver.maxRate
and spark.streaming.kafka.maxRatePerPartition
if they are set (see below)."
有关 Spark Streaming(DStream,而非 Structured Streaming)中可用的背压机制的所有详细信息都在您已链接的博客中进行了解释 Enable Back Pressure To Make Your Spark Streaming Application Production Ready。
通常,如果您启用背压,您会将 spark.streaming.kafka.maxRatePerPartition
设置为最佳估计速率的 150% ~ 200%。
PID controller can be found in the code within the class PIDRateEstimator的精确计算。
Spark Streaming 的背压示例
正如您要求的示例,这是我在我的一个高效应用程序中完成的示例:
设置
- Kafka 主题有 16 个分区
- Spark 以 16 个工作内核运行,因此每个分区都可以并行使用
- 使用 Spark 流(不是结构化流)
- 批次间隔为 10 秒
spark.streaming.backpressure.enabled
设为真
spark.streaming.kafka.maxRatePerPartition
设为 10000
spark.streaming.backpressure.pid.minRate
保持默认值 100
- 该作业可以处理大约 5000 条消息每个分区每秒
- 在开始流作业之前,Kafka 主题在每个分区中包含数百万条消息
观察
- 在第一批中,流式处理作业获取 16000 条(= 10 秒 * 16 个分区 * 100 pid.minRate)条消息。
- 作业处理这 16000 条消息的速度非常快,因此 PID 控制器估计的最佳速率大于 10000 的 masRatePerPartition。
- 因此,在第二批中,流式处理作业提取 16000(= 10 秒 * 16 个分区 * 10000 maxRatePerPartition)条消息。
- 现在,第二批完成大约需要 22 秒
- 因为我们的批次间隔设置为 10 秒,在 10 秒后,流作业已经安排了第三个微批次,再次为 1600000。原因是 PID 控制器只能使用来自 的性能信息已完成 个微批次。
- 仅在第六或第七个微批次中,PID 控制器找到了每个分区每秒约 5000 条消息的最佳处理速率。
我正在尝试在读取配置单元 table 后将数据写入 Kafka 主题,如下所示。
write_kafka_data.py:
read_df = spark.sql("select * from db.table where some_column in ('ASIA', 'Europe')")
final_df = read_df.select(F.to_json(F.struct(F.col("*"))).alias("value"))
final_df.write.format("kafka")\
.option("kafka.bootstrap.servers", kafka_broker)\
.option("kafka.batch.size", 51200)\
.option("retries", 3)\
.option("kafka.max.request.size", 500000)\
.option("kafka.max.block.ms", 120000)\
.option("kafka.metadata.max.age.ms", 120000)\
.option("kafka.request.timeout.ms", 120000)\
.option("kafka.linger.ms", 0)\
.option("kafka.delivery.timeout.ms", 130000)\
.option("acks", "1")\
.option("kafka.compression.type", "snappy")\
.option("kafka.security.protocol", "SASL_SSL")\
.option("kafka.sasl.jaas.config", oauth_config)\
.option("kafka.sasl.login.callback.handler.class", "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler")\
.option("kafka.sasl.mechanism", "OAUTHBEARER")\
.option("topic", 'topic_name')\
.save()
写入成功后(记录数为29000条),我正在另一个文件中读取与以下相同主题的数据: read_kafka_data.py:
# SCHEMA
schema = StructType([StructField("col1", StringType()),
StructField("col2", IntegerType())
])
# READ FROM TOPIC
jass_config = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required" \
+ " oauth.token.endpoint.uri=" + '"' + "uri" + '"' \
+ " oauth.client.id=" + '"' + "client_id" + '"' \
+ " oauth.client.secret=" + '"' + "secret_key" + '" ;'
stream_df = spark.readStream \
.format('kafka') \
.option('kafka.bootstrap.servers', kafka_broker) \
.option('subscribe', 'topic_name') \
.option('kafka.security.protocol', 'SASL_SSL') \
.option('kafka.sasl.mechanism', 'OAUTHBEARER') \
.option('kafka.sasl.jaas.config', jass_config) \
.option('kafka.sasl.login.callback.handler.class', "io.strimzi.kafka.oauth.client.JaasClientOauthLoginCallbackHandler") \
.option('startingOffsets', 'latest') \
.option('group.id', 'group_id') \
.option('maxOffsetsPerTrigger', 200) \
.option('fetchOffset.retryIntervalMs', 200) \
.option('fetchOffset.numRetries', 3) \
.load()\
.select(from_json(col('value').cast('string'), schema).alias("json_dta")).selectExpr('json_dta.*')
stream_df.writeStream.outputMode('append')
.format(HiveWarehouseSession.STREAM_TO_STREAM)
.option("database", "database_name")
.option("table", "table_name")
.option("metastoreUri", spark.conf.get("spark.datasource.hive.warehouse.metastoreUri"))
.option("checkpointLocation", "/path/to/checkpoint/dir")
.start().awaitTermination()
我是 Kafka 的初学者,一直在阅读 Kafka 性能优化技术并遇到了这两个。
spark.streaming.backpressure.enabled
andspark.streaming.kafka.maxRatePerPartition
启用第一个参数:
sparkConf.set("spark.streaming.backpressure.enabled",”true”)
官方文档中对上述参数的解释为:
Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
现在我是运行第一次申请,没有之前的微批,我是否应该指定一些值:spark.streaming.backpressure.initialRate
如果是,我应该如何确定spark.streaming.backpressure.initialRate
的值。
文档还说如果 spark.streaming.backpressure.enabled
设置为 true
最大接收速率是动态设置的。
如果是这样,我们是否还需要配置:
spark.streaming.receiver.maxRate
和 spark.streaming.kafka.maxRatePerPartition
如果 spark.streaming.backpressure.enabled
设置为 true
?
这 link 表示在施加背压时使用 spark.streaming.backpressure.initialRate
没有影响。
如能帮助消除混淆,我们将不胜感激。
您所指的配置 spark.streaming.[...]
属于 Direct Streaming(又名 Spark Streaming),不 结构化流.
如果您不知道其中的区别,我建议您查看单独的编程指南:
- Structured Streaming: processing structured data streams with relation queries (using Datasets and DataFrames, newer API than DStreams)
- Spark Streaming: processing data streams using DStreams (old API)
Structured Streaming 不提供背压机制。当您从 Kafka 消费时,您可以使用(正如您已经在做的那样)选项 maxOffsetsPerTrigger
来设置每个触发器上读取消息的限制。此选项在 Structured Streaming and Kafka Integration Guide 中记录为:
"Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topicPartitions of different volume."
如果你对题目问题还感兴趣
How is
spark.streaming.kafka.maxRatePerPartition
related tospark.streaming.backpressure.enabled
in case of spark streaming with Kafka?
此关系在 Spark's Configuration 上的文档中进行了解释:
"Enables or disables Spark Streaming's internal backpressure mechanism (since 1.5). This enables the Spark Streaming to control the receiving rate based on the current batch scheduling delays and processing times so that the system receives only as fast as the system can process. Internally, this dynamically sets the maximum receiving rate of receivers. This rate is upper bounded by the values
spark.streaming.receiver.maxRate
andspark.streaming.kafka.maxRatePerPartition
if they are set (see below)."
有关 Spark Streaming(DStream,而非 Structured Streaming)中可用的背压机制的所有详细信息都在您已链接的博客中进行了解释 Enable Back Pressure To Make Your Spark Streaming Application Production Ready。
通常,如果您启用背压,您会将 spark.streaming.kafka.maxRatePerPartition
设置为最佳估计速率的 150% ~ 200%。
PID controller can be found in the code within the class PIDRateEstimator的精确计算。
Spark Streaming 的背压示例
正如您要求的示例,这是我在我的一个高效应用程序中完成的示例:
设置
- Kafka 主题有 16 个分区
- Spark 以 16 个工作内核运行,因此每个分区都可以并行使用
- 使用 Spark 流(不是结构化流)
- 批次间隔为 10 秒
spark.streaming.backpressure.enabled
设为真spark.streaming.kafka.maxRatePerPartition
设为 10000spark.streaming.backpressure.pid.minRate
保持默认值 100- 该作业可以处理大约 5000 条消息每个分区每秒
- 在开始流作业之前,Kafka 主题在每个分区中包含数百万条消息
观察
- 在第一批中,流式处理作业获取 16000 条(= 10 秒 * 16 个分区 * 100 pid.minRate)条消息。
- 作业处理这 16000 条消息的速度非常快,因此 PID 控制器估计的最佳速率大于 10000 的 masRatePerPartition。
- 因此,在第二批中,流式处理作业提取 16000(= 10 秒 * 16 个分区 * 10000 maxRatePerPartition)条消息。
- 现在,第二批完成大约需要 22 秒
- 因为我们的批次间隔设置为 10 秒,在 10 秒后,流作业已经安排了第三个微批次,再次为 1600000。原因是 PID 控制器只能使用来自 的性能信息已完成 个微批次。
- 仅在第六或第七个微批次中,PID 控制器找到了每个分区每秒约 5000 条消息的最佳处理速率。