Spark Streaming 调整每个批量大小的记录数不起作用?
Spark Streaming tuning number of records per batch size not working?
我的 Spark 流应用程序正在使用 DStream 方法从 kafka 读取数据,我正在尝试让批处理大小在 10 秒内处理 60,000 条消息。
我做了什么,
- 创建了一个包含 3 个分区的主题
spark.streaming.kafka.maxRatePerPartition = 60000
spark.streaming.backpressure.enabled = true
- 创建时将批处理持续时间设置为 10 秒
StreamingContext
- 运行 在 yarn 模式下有 2 个执行器(3 个总共有 4 个核心
分区)
现在我如何测试它是否有效。
我有一个生产者一次向主题发送 60,000 条消息。当我检查火花 UI 时,我得到以下信息:
batch time | Input size | processing time
10:54:30 | 17610 | 5s
10:54:20 | 32790 | 8s
10:54:10 | 9600 | 3s
所以每批时间间隔10秒。我期望的是 1 批有 60,000 条记录。还有其他一些我没有设置的参数吗?从我读到的关于我当前设置的内容来看,我应该在一个批次中获得 10 * 60,000 * 3 = 1800000。
spark.app.id = application_1551747423133_0677
spark.app.name = KafkaCallDEV
spark.driver.cores = 2
spark.driver.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.driver.memory = 3g
spark.driver.port = 33917
spark.executor.cores = 2
spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.executor.id = driver
spark.executor.instances = 2
spark.executor.memory = 2g
spark.master = yarn
spark.scheduler.mode = FIFO
spark.streaming.backpressure.enabled = true
spark.streaming.kafka.maxRatePerPartition = 60000
spark.submit.deployMode = cluster
spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.ui.port = 0
spark.yarn.app.container.log.dir = /data0/yarn/container-logs/application_1551747423133_0677/container_1551747423133_0677_01_000002
下面是我用
打印出来的
logger.info(sparkSession.sparkContext.getConf.getAll.mkString("\n"))
我删除了一些不必要的日志,如服务器地址、应用程序名称等。
(spark.executor.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) (spark.yarn.app.id,application_1551747423133_0681)
(spark.submit.deployMode,cluster)
(spark.streaming.backpressure.enabled,true)
(spark.yarn.credentials.renewalTime,1562764821939ms)
(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)
(spark.executor.memory,2g)
(spark.yarn.credentials.updateTime,1562769141873ms)
(spark.driver.cores,2)
(spark.executor.id,driver)
(spark.executor.cores,2)
(spark.master,yarn)
(spark.driver.memory,3g)
(spark.sql.warehouse.dir,/user/hive/warehouse)
(spark.ui.port,0)
(spark.driver.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc)
(spark.executor.instances,2)
(spark.driver.port,37375)
我还有一些正在打印的 Kafka 配置,所以我也会 post 下面的那些。
org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 60000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
spark.streaming.kafka.maxRatePerPartition = 60000
意味着
the maximum rate (in messages per second) at which each Kafka
partition will be read by this direct API which will be enabled by property spark.streaming.backpressure.enabled = true
17610 + 32790 + 9600 = 60000 您的批量大小已达到。
见this
你的 3 个 kafka 分区(有 60k 消息)被 spark 在 chunks/spark 分区中读取,在你的例子中,3 个分区来自 spark。但是 3 个 kafka 分区中的原始消息数是 60000(17610 + 32790 + 9600)。即使高消息速率输入流正在返回压力也会使用 RateLimiter and PIDRateEstimator
保持统一的消息速率
到这里就完成了....
进一步参考我的post -Short note on Spark Streaming Back Pressure for better understanding
结论:
如果您启用背压,则无论您发送消息的速率如何。它将允许恒定速率的消息
就像这个说明性的一般示例......其中背压属性就像流入控制 - 压力调节螺钉以保持消息流的均匀速率。
所以我找到了 Spark 将我发送的那批记录分成多批的原因。我有 spark.streaming.backpressure.enabled = true
。这使用来自先前批次的反馈循环来控制接收速率,该接收速率上限为我在 spark.streaming.kafka.maxRatePerPartition
中设置的每个分区的最大速率。所以 spark 正在为我调整接收率。
我的 Spark 流应用程序正在使用 DStream 方法从 kafka 读取数据,我正在尝试让批处理大小在 10 秒内处理 60,000 条消息。
我做了什么,
- 创建了一个包含 3 个分区的主题
spark.streaming.kafka.maxRatePerPartition = 60000
spark.streaming.backpressure.enabled = true
- 创建时将批处理持续时间设置为 10 秒 StreamingContext
- 运行 在 yarn 模式下有 2 个执行器(3 个总共有 4 个核心 分区)
现在我如何测试它是否有效。
我有一个生产者一次向主题发送 60,000 条消息。当我检查火花 UI 时,我得到以下信息:
batch time | Input size | processing time
10:54:30 | 17610 | 5s
10:54:20 | 32790 | 8s
10:54:10 | 9600 | 3s
所以每批时间间隔10秒。我期望的是 1 批有 60,000 条记录。还有其他一些我没有设置的参数吗?从我读到的关于我当前设置的内容来看,我应该在一个批次中获得 10 * 60,000 * 3 = 1800000。
spark.app.id = application_1551747423133_0677
spark.app.name = KafkaCallDEV
spark.driver.cores = 2
spark.driver.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.driver.memory = 3g
spark.driver.port = 33917
spark.executor.cores = 2
spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc
spark.executor.id = driver
spark.executor.instances = 2
spark.executor.memory = 2g
spark.master = yarn
spark.scheduler.mode = FIFO
spark.streaming.backpressure.enabled = true
spark.streaming.kafka.maxRatePerPartition = 60000
spark.submit.deployMode = cluster
spark.ui.filters = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
spark.ui.port = 0
spark.yarn.app.container.log.dir = /data0/yarn/container-logs/application_1551747423133_0677/container_1551747423133_0677_01_000002
下面是我用
打印出来的logger.info(sparkSession.sparkContext.getConf.getAll.mkString("\n"))
我删除了一些不必要的日志,如服务器地址、应用程序名称等。
(spark.executor.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) (spark.yarn.app.id,application_1551747423133_0681)
(spark.submit.deployMode,cluster)
(spark.streaming.backpressure.enabled,true)
(spark.yarn.credentials.renewalTime,1562764821939ms)
(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)
(spark.executor.memory,2g)
(spark.yarn.credentials.updateTime,1562769141873ms)
(spark.driver.cores,2)
(spark.executor.id,driver)
(spark.executor.cores,2)
(spark.master,yarn)
(spark.driver.memory,3g)
(spark.sql.warehouse.dir,/user/hive/warehouse)
(spark.ui.port,0)
(spark.driver.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc)
(spark.executor.instances,2)
(spark.driver.port,37375)
我还有一些正在打印的 Kafka 配置,所以我也会 post 下面的那些。
org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values:
metric.reporters = []
metadata.max.age.ms = 300000
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
ssl.keystore.type = JKS
enable.auto.commit = false
sasl.mechanism = GSSAPI
interceptor.classes = null
exclude.internal.topics = true
ssl.truststore.password = null
client.id =
ssl.endpoint.identification.algorithm = null
max.poll.records = 60000
check.crcs = true
request.timeout.ms = 40000
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 65536
ssl.truststore.type = JKS
ssl.truststore.location = null
ssl.keystore.password = null
fetch.min.bytes = 1
send.buffer.bytes = 131072
value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
retry.backoff.ms = 100
ssl.secure.random.implementation = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.trustmanager.algorithm = PKIX
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
session.timeout.ms = 30000
metrics.num.samples = 2
key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
ssl.protocol = TLS
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
ssl.cipher.suites = null
security.protocol = PLAINTEXT
ssl.keymanager.algorithm = SunX509
metrics.sample.window.ms = 30000
auto.offset.reset = latest
spark.streaming.kafka.maxRatePerPartition = 60000 意味着
the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API which will be enabled by property
spark.streaming.backpressure.enabled = true
17610 + 32790 + 9600 = 60000 您的批量大小已达到。
见this
你的 3 个 kafka 分区(有 60k 消息)被 spark 在 chunks/spark 分区中读取,在你的例子中,3 个分区来自 spark。但是 3 个 kafka 分区中的原始消息数是 60000(17610 + 32790 + 9600)。即使高消息速率输入流正在返回压力也会使用 RateLimiter and PIDRateEstimator
保持统一的消息速率到这里就完成了....
进一步参考我的post -Short note on Spark Streaming Back Pressure for better understanding
结论: 如果您启用背压,则无论您发送消息的速率如何。它将允许恒定速率的消息
就像这个说明性的一般示例......其中背压属性就像流入控制 - 压力调节螺钉以保持消息流的均匀速率。
所以我找到了 Spark 将我发送的那批记录分成多批的原因。我有 spark.streaming.backpressure.enabled = true
。这使用来自先前批次的反馈循环来控制接收速率,该接收速率上限为我在 spark.streaming.kafka.maxRatePerPartition
中设置的每个分区的最大速率。所以 spark 正在为我调整接收率。