Spark Streaming 调整每个批量大小的记录数不起作用?

Spark Streaming tuning number of records per batch size not working?

我的 Spark 流应用程序正在使用 DStream 方法从 kafka 读取数据,我正在尝试让批处理大小在 10 秒内处理 60,000 条消息。

我做了什么,

现在我如何测试它是否有效。

我有一个生产者一次向主题发送 60,000 条消息。当我检查火花 UI 时,我得到以下信息:

 batch time | Input size | processing time
 10:54:30   | 17610      | 5s
 10:54:20   | 32790      | 8s
 10:54:10   | 9600       | 3s

所以每批时间间隔10秒。我期望的是 1 批有 60,000 条记录。还有其他一些我没有设置的参数吗?从我读到的关于我当前设置的内容来看,我应该在一个批次中获得 10 * 60,000 * 3 = 1800000。

spark.app.id  = application_1551747423133_0677

spark.app.name = KafkaCallDEV

spark.driver.cores = 2

spark.driver.extraJavaOptions   = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc

spark.driver.memory = 3g

spark.driver.port   = 33917

spark.executor.cores = 2

spark.executor.extraJavaOptions = -XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc

spark.executor.id   = driver

spark.executor.instances    = 2

spark.executor.memory   = 2g

spark.master    = yarn

spark.scheduler.mode    = FIFO

spark.streaming.backpressure.enabled    = true

spark.streaming.kafka.maxRatePerPartition = 60000

spark.submit.deployMode = cluster

spark.ui.filters    = org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter

spark.ui.port = 0

spark.yarn.app.container.log.dir = /data0/yarn/container-logs/application_1551747423133_0677/container_1551747423133_0677_01_000002 

下面是我用

打印出来的
logger.info(sparkSession.sparkContext.getConf.getAll.mkString("\n"))

我删除了一些不必要的日志,如服务器地址、应用程序名称等。

(spark.executor.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2

-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) (spark.yarn.app.id,application_1551747423133_0681)

(spark.submit.deployMode,cluster)

(spark.streaming.backpressure.enabled,true)

(spark.yarn.credentials.renewalTime,1562764821939ms)

(spark.ui.filters,org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter)

(spark.executor.memory,2g) 

(spark.yarn.credentials.updateTime,1562769141873ms)

(spark.driver.cores,2) 

(spark.executor.id,driver)

(spark.executor.cores,2)

(spark.master,yarn)

(spark.driver.memory,3g)

(spark.sql.warehouse.dir,/user/hive/warehouse) 

(spark.ui.port,0)

(spark.driver.extraJavaOptions,-XX:+UseG1GC -XX:ConcGCThreads=2
-XX:InitiatingHeapOccupancyPercent=35 -Dlog4j.configuration=log4j.properties -verbose:gc) 

(spark.executor.instances,2)

(spark.driver.port,37375)

我还有一些正在打印的 Kafka 配置,所以我也会 post 下面的那些。

    org.apache.kafka.clients.consumer.ConsumerConfig:178 - ConsumerConfig values: 
        metric.reporters = []
        metadata.max.age.ms = 300000
        partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
        reconnect.backoff.ms = 50
        sasl.kerberos.ticket.renew.window.factor = 0.8
        max.partition.fetch.bytes = 1048576
        ssl.keystore.type = JKS
        enable.auto.commit = false
        sasl.mechanism = GSSAPI
        interceptor.classes = null
        exclude.internal.topics = true
        ssl.truststore.password = null
        client.id = 
        ssl.endpoint.identification.algorithm = null
        max.poll.records = 60000
        check.crcs = true
        request.timeout.ms = 40000
        heartbeat.interval.ms = 3000
        auto.commit.interval.ms = 5000
        receive.buffer.bytes = 65536
        ssl.truststore.type = JKS
        ssl.truststore.location = null
        ssl.keystore.password = null
        fetch.min.bytes = 1
        send.buffer.bytes = 131072
        value.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        retry.backoff.ms = 100
        ssl.secure.random.implementation = null
        sasl.kerberos.kinit.cmd = /usr/bin/kinit
        sasl.kerberos.service.name = null
        sasl.kerberos.ticket.renew.jitter = 0.05
        ssl.trustmanager.algorithm = PKIX
        ssl.key.password = null
        fetch.max.wait.ms = 500
        sasl.kerberos.min.time.before.relogin = 60000
        connections.max.idle.ms = 540000
        session.timeout.ms = 30000
        metrics.num.samples = 2
        key.deserializer = class org.apache.kafka.common.serialization.StringDeserializer
        ssl.protocol = TLS
        ssl.provider = null
        ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
        ssl.keystore.location = null
        ssl.cipher.suites = null
        security.protocol = PLAINTEXT
        ssl.keymanager.algorithm = SunX509
        metrics.sample.window.ms = 30000
        auto.offset.reset = latest

spark.streaming.kafka.maxRatePerPartition = 60000 意味着

the maximum rate (in messages per second) at which each Kafka partition will be read by this direct API which will be enabled by property spark.streaming.backpressure.enabled = true

17610 + 32790 + 9600 = 60000 您的批量大小已达到。


this

你的 3 个 kafka 分区(有 60k 消息)被 spark 在 chunks/spark 分区中读取,在你的例子中,3 个分区来自 spark。但是 3 个 kafka 分区中的原始消息数是 60000(17610 + 32790 + 9600)。即使高消息速率输入流正在返回压力也会使用 RateLimiter and PIDRateEstimator

保持统一的消息速率

到这里就完成了....

进一步参考我的post -Short note on Spark Streaming Back Pressure for better understanding

结论: 如果您启用背压,则无论您发送消息的速率如何。它将允许恒定速率的消息

就像这个说明性的一般示例......其中背压属性就像流入控制 - 压力调节螺钉以保持消息流的均匀速率。

所以我找到了 Spark 将我发送的那批记录分成多批的原因。我有 spark.streaming.backpressure.enabled = true。这使用来自先前批次的反馈循环来控制接收速率,该接收速率上限为我在 spark.streaming.kafka.maxRatePerPartition 中设置的每个分区的最大速率。所以 spark 正在为我调整接收率。