Spark:具有不同内核数的不同输出

Spark: Different output with different number of cores

当我更改我的 Spark 应用程序中的内核数量时,我遇到了一个奇怪的行为,代码如下:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkContext, SparkConf}
object Test extends App {
Logger.getLogger("org").setLevel(Level.WARN)
var listLink: List[String] = List()
def addListLink(s: String) = {
val list = s.split(",")
for (i <- 0 to list.length - 2) {
  listLink = list(i)+ "-" + list(i + 1) :: listLink
 }
}
val conf = new SparkConf().setMaster("local[1]").setAppName("Simple Application")
val sc = new SparkContext(conf)
val paths = sc.textFile("file:///tmp/percorsi.txt")
paths.foreach(x => addListLink(x))
println("Number of items:"+listLink.size)
println(listLink)
}

我的输入文件是这样的:

A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C,D
A,B,C
A,B,C
A,B,C
A,B,C
A,B,C
B,C,D
B,C,D
B,C,D
B,C,D
C,D
C,D

基本上,对于每条路径,我都调用我的方法,该方法将一个元素添加到代表每对连续元素的列表中:

示例:"A,B,C,D" => ("A-B","B-C","C-D")

如您所见,代码中只有一个核心

.setMaster("local[1]")

如果我 运行 我的应用程序(在本地或集群上),我会得到我期望的结果

println("Number of items:"+listLink.size)
//Result --> Number of Items : 38

如果我将核心数更改为 3(例如),我会得到不同的值。 例如 33 个项目而不是 38 个。

我是否遗漏了有关内核数量或其他内容(分区、ecc...)的信息?

我认为这是一个非常简单的应用程序,但我还是遇到了这种奇怪的行为。

谁能帮帮我?

提前致谢

FF

每个分区有一个单独的 listLink。因此,您将项目添加到多个列表,最后只打印一个。

Normally, when a function passed to a Spark operation (such as map or reduce) is executed on a remote cluster node, it works on separate copies of all the variables used in the function. These variables are copied to each machine, and no updates to the variables on the remote machine are propagated back to the driver program.

(从这里 https://spark.apache.org/docs/latest/programming-guide.html#shared-variables

今天是你的幸运日:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import scala.collection.mutable.ArrayBuffer


val data = List(
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C,D",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"A,B,C",
"B,C,D",
"B,C,D",
"B,C,D",
"B,C,D",
"C,D",
"C,D")

val conf = new SparkConf().setAppName("spark-scratch").setMaster("local")
val sc= new SparkContext(conf)


val dataRDD = sc.makeRDD(data, 1)
val linkRDD = dataRDD.flatMap(_.split(",").sliding(2).map{_.mkString("", "-", "")})

linkRDD.foreach(println)

输出:

A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
C-D
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
A-B
B-C
B-C
C-D
B-C
C-D
B-C
C-D
B-C
C-D
C-D
C-D