Spark Streaming:如何向我的 DStream 添加更多分区?

Spark Streaming: How can I add more partitions to my DStream?

我有一个 spark-streaming 应用程序,它看起来像这样:

val message = KafkaUtils.createStream(...).map(_._2)

message.foreachRDD( rdd => {

  if (!rdd.isEmpty){
    val kafkaDF = sqlContext.read.json(rdd)

    kafkaDF.foreachPartition(
      i =>{
        createConnection()
        i.foreach(
          row =>{
            connection.sendToTable()
          }
        )
        closeConnection()
      }
    )

而且,我 运行 使用

在纱线簇上
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....

当我尝试记录 kafkaDF.rdd.partitions.size 时,结果大多是“1”或“5”。我很困惑,是否可以控制我的 DataFrame 的分区数? KafkaUtils.createStream 似乎不接受任何与我想要的 rdd 分区数相关的参数。我试了kafkaDF.rdd.repartition( int ),但好像也没用。

如何在我的代码中实现更多的并行性?如果我的做法是错误的,那么正确的实现方式是什么?

在 Spark Streaming 中,可以在两个方面实现并行性:(a) consumers/receivers(在您的情况下是 Kafka 消费者),以及 (b) 处理(由 Spark 完成)。

默认情况下,spark streaming 会为每个消费者分配一个核心(又名线程)。因此,如果您需要摄取更多数据,则需要创建更多消费者。每个消费者都会创建一个 DStream。然后,您可以合并 DStreams 以获得一个大流。

// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B

val combineStream = messageStream1.union(messageStream2)

Alternatively,可以通过重新分区输入流来增加receivers/consumers的数量:

inputStream.repartition(<number of partitions>))

流式应用程序可用的所有剩余核心都将分配给 Spark。

因此,如果您有 N 个核心(通过 spark.cores.max 定义)并且您有 C 个消费者,您将剩下 N-C 个可用于 Spark 的核心。

#Partitions =~  #Consumers x (batch duration / block interval)

block interval = 消费者在将其创建的数据推送为 spark 块之前等待的时间(定义为配置 spark.streaming.blockInterval)。

永远记住,Spark Streaming 有两个经常发生的功能。一组读取当前微批(消费者)的线程,以及一组处理前一个微批(Spark)的线程。

更多性能调优技巧请参考here, here and here