Spark Streaming:如何向我的 DStream 添加更多分区?
Spark Streaming: How can I add more partitions to my DStream?
我有一个 spark-streaming 应用程序,它看起来像这样:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
而且,我 运行 使用
在纱线簇上
spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
当我尝试记录 kafkaDF.rdd.partitions.size
时,结果大多是“1”或“5”。我很困惑,是否可以控制我的 DataFrame 的分区数? KafkaUtils.createStream
似乎不接受任何与我想要的 rdd 分区数相关的参数。我试了kafkaDF.rdd.repartition( int )
,但好像也没用。
如何在我的代码中实现更多的并行性?如果我的做法是错误的,那么正确的实现方式是什么?
在 Spark Streaming 中,可以在两个方面实现并行性:(a) consumers/receivers(在您的情况下是 Kafka 消费者),以及 (b) 处理(由 Spark 完成)。
默认情况下,spark streaming 会为每个消费者分配一个核心(又名线程)。因此,如果您需要摄取更多数据,则需要创建更多消费者。每个消费者都会创建一个 DStream。然后,您可以合并 DStreams 以获得一个大流。
// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)
Alternatively,可以通过重新分区输入流来增加receivers/consumers的数量:
inputStream.repartition(<number of partitions>))
流式应用程序可用的所有剩余核心都将分配给 Spark。
因此,如果您有 N
个核心(通过 spark.cores.max
定义)并且您有 C
个消费者,您将剩下 N-C
个可用于 Spark 的核心。
#Partitions =~ #Consumers x (batch duration / block interval)
block interval = 消费者在将其创建的数据推送为 spark 块之前等待的时间(定义为配置 spark.streaming.blockInterval
)。
永远记住,Spark Streaming 有两个经常发生的功能。一组读取当前微批(消费者)的线程,以及一组处理前一个微批(Spark)的线程。
我有一个 spark-streaming 应用程序,它看起来像这样:
val message = KafkaUtils.createStream(...).map(_._2)
message.foreachRDD( rdd => {
if (!rdd.isEmpty){
val kafkaDF = sqlContext.read.json(rdd)
kafkaDF.foreachPartition(
i =>{
createConnection()
i.foreach(
row =>{
connection.sendToTable()
}
)
closeConnection()
}
)
而且,我 运行 使用
在纱线簇上spark-submit --master yarn-cluster --num-executors 3 --driver-memory 2g --executor-memory 2g --executor-cores 5....
当我尝试记录 kafkaDF.rdd.partitions.size
时,结果大多是“1”或“5”。我很困惑,是否可以控制我的 DataFrame 的分区数? KafkaUtils.createStream
似乎不接受任何与我想要的 rdd 分区数相关的参数。我试了kafkaDF.rdd.repartition( int )
,但好像也没用。
如何在我的代码中实现更多的并行性?如果我的做法是错误的,那么正确的实现方式是什么?
在 Spark Streaming 中,可以在两个方面实现并行性:(a) consumers/receivers(在您的情况下是 Kafka 消费者),以及 (b) 处理(由 Spark 完成)。
默认情况下,spark streaming 会为每个消费者分配一个核心(又名线程)。因此,如果您需要摄取更多数据,则需要创建更多消费者。每个消费者都会创建一个 DStream。然后,您可以合并 DStreams 以获得一个大流。
// A basic example with two threads for consumers
val messageStream1 = KafkaUtils.createStream(...) // say, reading topic A
val messageStream2 = KafkaUtils.createStream(...) // and this one reading topic B
val combineStream = messageStream1.union(messageStream2)
Alternatively,可以通过重新分区输入流来增加receivers/consumers的数量:
inputStream.repartition(<number of partitions>))
流式应用程序可用的所有剩余核心都将分配给 Spark。
因此,如果您有 N
个核心(通过 spark.cores.max
定义)并且您有 C
个消费者,您将剩下 N-C
个可用于 Spark 的核心。
#Partitions =~ #Consumers x (batch duration / block interval)
block interval = 消费者在将其创建的数据推送为 spark 块之前等待的时间(定义为配置 spark.streaming.blockInterval
)。
永远记住,Spark Streaming 有两个经常发生的功能。一组读取当前微批(消费者)的线程,以及一组处理前一个微批(Spark)的线程。