从 spark DStream 获取的 RDD 没有在分区之间分配
RDD taken from spark DStream is not getting distributed between partitions
我有五个执行程序 运行 用于此 spark 作业,但来自 dstream 的 RDD 仅分布在 2 个分区之间。如果我执行 repartion(5)
然后它仍然分布在 2 个分区中,但是当我通过执行 val newrdd= sparkcontext.parallelize(rdd.take(rdd.count()))
从中创建一个新的 RDD 时,它会正确地分布在 5 个分区中。但是在并行化现有的 RDD 之后创建一个新的 RDD 并不是一个好主意,所以我不想这样做。
我是不是漏掉了什么?
代码:
val ssc = new StreamingContext(sparksession.sparkContext, Seconds(batchDuration.toLong))
val inputDirectStream = EventHubsUtils.createDirectStreams(
ssc,
eventHubNamespace,
progressDir,
Map(eventHubName -> eventhubParameters))
inputDirectStream.foreachRDD { rdd =>
println(rdd.partitions.size)//it prints 2
rdd.repartition(5)
println(rdd.partitions.size)//it also prints 2
var newrdd = sparksession.sparkContext.parallelize(rdd.take(rdd.count().toInt))
println(newrdd.partitions.size)//it prints 5
}
我 运行 我的 spark stream 工作如下:
spark-submit --class "com.mycomp.Main" --executor-memory 1g
--executor-cores 1 --num-executors 5 --conf "spark.streaming.stopGracefullyOnShutdown=true" --master yarn --jars
/tmp/jobs/supporting.jar /tmp/jobs/cdc.jar false > /tmp/jobs/output
2>&1
关于如何使 RDD 分布在 5 个分区之间的任何建议(取决于执行程序和内核的数量)。
正在调用 repartition
returns 一个您需要使用的新 RDD(更改了分区)。换句话说,您需要将重新分区调用中的 return 值分配给一个新变量,否则您只是在使用具有旧分区的旧 RDD。更改为 val rdd2 = rdd.repartition(5)
并在之后使用 rdd2
。
注意:由于 Scala 在设计上是一种惰性语言,因此在对数据执行操作之前不会发生实际的重新分区。例如,您可以对数据 运行 first
或 count
进行重新分区。但是,使用 rdd.partitions.size
检查分区仍会正确反映。
我有五个执行程序 运行 用于此 spark 作业,但来自 dstream 的 RDD 仅分布在 2 个分区之间。如果我执行 repartion(5)
然后它仍然分布在 2 个分区中,但是当我通过执行 val newrdd= sparkcontext.parallelize(rdd.take(rdd.count()))
从中创建一个新的 RDD 时,它会正确地分布在 5 个分区中。但是在并行化现有的 RDD 之后创建一个新的 RDD 并不是一个好主意,所以我不想这样做。
我是不是漏掉了什么?
代码:
val ssc = new StreamingContext(sparksession.sparkContext, Seconds(batchDuration.toLong))
val inputDirectStream = EventHubsUtils.createDirectStreams(
ssc,
eventHubNamespace,
progressDir,
Map(eventHubName -> eventhubParameters))
inputDirectStream.foreachRDD { rdd =>
println(rdd.partitions.size)//it prints 2
rdd.repartition(5)
println(rdd.partitions.size)//it also prints 2
var newrdd = sparksession.sparkContext.parallelize(rdd.take(rdd.count().toInt))
println(newrdd.partitions.size)//it prints 5
}
我 运行 我的 spark stream 工作如下:
spark-submit --class "com.mycomp.Main" --executor-memory 1g --executor-cores 1 --num-executors 5 --conf "spark.streaming.stopGracefullyOnShutdown=true" --master yarn --jars /tmp/jobs/supporting.jar /tmp/jobs/cdc.jar false > /tmp/jobs/output 2>&1
关于如何使 RDD 分布在 5 个分区之间的任何建议(取决于执行程序和内核的数量)。
正在调用 repartition
returns 一个您需要使用的新 RDD(更改了分区)。换句话说,您需要将重新分区调用中的 return 值分配给一个新变量,否则您只是在使用具有旧分区的旧 RDD。更改为 val rdd2 = rdd.repartition(5)
并在之后使用 rdd2
。
注意:由于 Scala 在设计上是一种惰性语言,因此在对数据执行操作之前不会发生实际的重新分区。例如,您可以对数据 运行 first
或 count
进行重新分区。但是,使用 rdd.partitions.size
检查分区仍会正确反映。