DataFrame/Dataset 加入在 Spark 2.0/Yarn 中没有产生正确的结果
DataFrame/Dataset join not producing correct results in Spark 2.0/Yarn
我们在 Hadoop 2.7.2、Centos 7.2 上有一个集群 运行 Apache Spark 2.0。我们已经使用 Spark DataFrame/DataSet APIs 编写了一些新代码,但注意到在将数据写入然后将数据读取到 Windows Azure 存储 Blob(默认 HDFS 位置)后,连接的结果不正确。我已经能够在集群上使用以下代码片段 运行 重现该问题。
case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)
val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS
dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show
产出
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| 0| 1| 1.0|
+-----+---------+-----+---------+-------+-----+
这是正确的。然而在写入和读取数据之后,我们看到了这个
dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")
val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]
dims2.show
cent2.show
dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
产出
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| null| null| null|
+-----+---------+-----+---------+-------+-----+
但是,使用 RDD API 会产生正确的结果
dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5)
res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))
我们尝试将输出格式更改为 ORC 而不是 parquet,但我们看到了相同的结果。 运行 Spark 2.0 在本地,不在集群上,没有这个问题。此外 运行 在 Hadoop 集群的主节点上本地模式下的 spark 也可以工作。只有当 运行 在 YARN 之上时,我们才会看到这个问题。
这似乎也与这个问题非常相似:https://issues.apache.org/jira/browse/SPARK-10896
中提交的拉取请求修复
我们在 Hadoop 2.7.2、Centos 7.2 上有一个集群 运行 Apache Spark 2.0。我们已经使用 Spark DataFrame/DataSet APIs 编写了一些新代码,但注意到在将数据写入然后将数据读取到 Windows Azure 存储 Blob(默认 HDFS 位置)后,连接的结果不正确。我已经能够在集群上使用以下代码片段 运行 重现该问题。
case class UserDimensions(user: Long, dimension: Long, score: Double)
case class CentroidClusterScore(dimension: Long, cluster: Int, score: Double)
val dims = sc.parallelize(Array(UserDimensions(12345, 0, 1.0))).toDS
val cent = sc.parallelize(Array(CentroidClusterScore(0, 1, 1.0),CentroidClusterScore(1, 0, 1.0),CentroidClusterScore(2, 2, 1.0))).toDS
dims.show
cent.show
dims.join(cent, dims("dimension") === cent("dimension") ).show
产出
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| 0| 1| 1.0|
+-----+---------+-----+---------+-------+-----+
这是正确的。然而在写入和读取数据之后,我们看到了这个
dims.write.mode("overwrite").save("/tmp/dims2.parquet")
cent.write.mode("overwrite").save("/tmp/cent2.parquet")
val dims2 = spark.read.load("/tmp/dims2.parquet").as[UserDimensions]
val cent2 = spark.read.load("/tmp/cent2.parquet").as[CentroidClusterScore]
dims2.show
cent2.show
dims2.join(cent2, dims2("dimension") === cent2("dimension") ).show
产出
+-----+---------+-----+
| user|dimension|score|
+-----+---------+-----+
|12345| 0| 1.0|
+-----+---------+-----+
+---------+-------+-----+
|dimension|cluster|score|
+---------+-------+-----+
| 0| 1| 1.0|
| 1| 0| 1.0|
| 2| 2| 1.0|
+---------+-------+-----+
+-----+---------+-----+---------+-------+-----+
| user|dimension|score|dimension|cluster|score|
+-----+---------+-----+---------+-------+-----+
|12345| 0| 1.0| null| null| null|
+-----+---------+-----+---------+-------+-----+
但是,使用 RDD API 会产生正确的结果
dims2.rdd.map( row => (row.dimension, row) ).join( cent2.rdd.map( row => (row.dimension, row) ) ).take(5)
res5: Array[(Long, (UserDimensions, CentroidClusterScore))] = Array((0,(UserDimensions(12345,0,1.0),CentroidClusterScore(0,1,1.0))))
我们尝试将输出格式更改为 ORC 而不是 parquet,但我们看到了相同的结果。 运行 Spark 2.0 在本地,不在集群上,没有这个问题。此外 运行 在 Hadoop 集群的主节点上本地模式下的 spark 也可以工作。只有当 运行 在 YARN 之上时,我们才会看到这个问题。
这似乎也与这个问题非常相似:https://issues.apache.org/jira/browse/SPARK-10896