Spark:转换为 DF 后 collect()、take() 和 show() 输出之间的差异

Spark: Difference between collect(), take() and show() outputs after conversion toDF

我正在使用 Spark 1.5。

我有一列 30 个 ID,我正在从数据库中加载为 integers

val numsRDD = sqlContext
     .table(constants.SOURCE_DB + "." + IDS)
     .select("id")
     .distinct
     .map(row=>row.getInt(0))

这是numsRDD的输出:

numsRDD.collect.foreach(println(_))

643761
30673603
30736590
30773400
30832624
31104189
31598495
31723487
32776244
32801792
32879386
32981901
33469224
34213505
34709608
37136455
37260344
37471301
37573190
37578690
37582274
37600896
37608984
37616677
37618105
37644500
37647770
37648497
37720353
37741608

接下来,我想为那些 ids 生成 3 的所有 组合,然后将每个组合保存为以下形式的元组: < tripletID: String, triplet: Array(Int)> 和将其转换为 dataframe,我这样做如下:

// |combinationsDF| = 4060 combinations
val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","), 
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

一旦我这样做了,我就尝试打印一些 combinationsDF 的内容,只是为了确保一切都是它应该的样子。所以我试试这个:

combinationsDF.show

哪个returns:

+--------------------+--------------------+
|           tripletID|             triplet|
+--------------------+--------------------+
|,37136455,3758227...|[32776244, 371364...|
|,37136455,3761667...|[32776244, 371364...|
|,32776244,3713645...|[31723487, 327762...|
|,37136455,3757869...|[32776244, 371364...|
|,32776244,3713645...|[31598495, 327762...|
|,37136455,3760089...|[32776244, 371364...|
|,37136455,3764849...|[32776244, 371364...|
|,37136455,3764450...|[32776244, 371364...|
|,37136455,3747130...|[32776244, 371364...|
|,32981901,3713645...|[32776244, 329819...|
|,37136455,3761810...|[32776244, 371364...|
|,34213505,3713645...|[32776244, 342135...|
|,37136455,3726034...|[32776244, 371364...|
|,37136455,3772035...|[32776244, 371364...|
|2776244,37136455...|[643761, 32776244...|
|,37136455,3764777...|[32776244, 371364...|
|,37136455,3760898...|[32776244, 371364...|
|,32879386,3713645...|[32776244, 328793...|
|,32776244,3713645...|[31104189, 327762...|
|,32776244,3713645...|[30736590, 327762...|
+--------------------+--------------------+
only showing top 20 rows

很明显,每个 tripletID 第一个元素 丢失了。所以,为了 100% 确定我使用 take(20) 如下:

combinationsDF.take(20).foreach(println(_))

其中 returns 更详细的表示如下:

[,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[2776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]

所以现在我确定来自 tripletID 的第一个 id 不知何故不知何故被弃用了。但是,如果我尝试使用 collect 而不是 take(20):

combinationsDF.collect.foreach(println(_))

一切又恢复正常 (!!!):

[32776244,37136455,37582274,WrappedArray(32776244, 37136455, 37582274)]
[32776244,37136455,37616677,WrappedArray(32776244, 37136455, 37616677)]
[31723487,32776244,37136455,WrappedArray(31723487, 32776244, 37136455)]
[32776244,37136455,37578690,WrappedArray(32776244, 37136455, 37578690)]
[31598495,32776244,37136455,WrappedArray(31598495, 32776244, 37136455)]
[32776244,37136455,37600896,WrappedArray(32776244, 37136455, 37600896)]
[32776244,37136455,37648497,WrappedArray(32776244, 37136455, 37648497)]
[32776244,37136455,37644500,WrappedArray(32776244, 37136455, 37644500)]
[32776244,37136455,37471301,WrappedArray(32776244, 37136455, 37471301)]
[32776244,32981901,37136455,WrappedArray(32776244, 32981901, 37136455)]
[32776244,37136455,37618105,WrappedArray(32776244, 37136455, 37618105)]
[32776244,34213505,37136455,WrappedArray(32776244, 34213505, 37136455)]
[32776244,37136455,37260344,WrappedArray(32776244, 37136455, 37260344)]
[32776244,37136455,37720353,WrappedArray(32776244, 37136455, 37720353)]
[643761,32776244,37136455,WrappedArray(643761, 32776244, 37136455)]
[32776244,37136455,37647770,WrappedArray(32776244, 37136455, 37647770)]
[32776244,37136455,37608984,WrappedArray(32776244, 37136455, 37608984)]
[32776244,32879386,37136455,WrappedArray(32776244, 32879386, 37136455)]
[31104189,32776244,37136455,WrappedArray(31104189, 32776244, 37136455)]
[30736590,32776244,37136455,WrappedArray(30736590, 32776244, 37136455)]
...

1。我已经详尽地查询了 parallelize 将组合数组转换为 RDD 之前的步骤,一切正常。 2. 我还在应用 parallelize 后立即打印输出 并再次 一切正常。 3. 问题似乎与 numsRDD 到 DF 的转换有关,尽管我尽了最大努力,但我无法处理它。 4. 我也无法使用相同的代码片段重现模拟数据的问题。

所以首先:是什么导致了这个问题? 第二:我该如何解决?

我会检查你的原始 numsRDD,看起来你可能有一个空字符串或空值。这对我有用:

scala> val numsRDD = sc.parallelize(0 to 30)
numsRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:27

scala> :pa
// Entering paste mode (ctrl-D to finish)

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map(row => (
        List(row(0), row(1), row(2)).mkString(","),
        List(row(0), row(1), row(2)).toArray)))
  .toDF("tripletID","triplet")

// Exiting paste mode, now interpreting.

combinationsDF: org.apache.spark.sql.DataFrame = [tripletID: string, triplet: array<int>]

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows

我唯一能想到的是 mkString 没有像您期望的那样工作。试试这个字符串插值(也不需要重新创建 List):

val combinationsDF = sc
  .parallelize(numsRDD
     .collect
     .combinations(3)
     .toArray
     .map(row => row.sorted)
     .map{case List(a,b,c) => (
        s"$a,$b,$c", 
        Array(a,b,c))}
  .toDF("tripletID","triplet")

scala> combinationsDF.show
+---------+----------+
|tripletID|   triplet|
+---------+----------+
|    0,1,2| [0, 1, 2]|
|    0,1,3| [0, 1, 3]|
|    0,1,4| [0, 1, 4]|
|    0,1,5| [0, 1, 5]|
|    0,1,6| [0, 1, 6]|
|    0,1,7| [0, 1, 7]|
|    0,1,8| [0, 1, 8]|
|    0,1,9| [0, 1, 9]|
|   0,1,10|[0, 1, 10]|
|   0,1,11|[0, 1, 11]|
|   0,1,12|[0, 1, 12]|
|   0,1,13|[0, 1, 13]|
|   0,1,14|[0, 1, 14]|
|   0,1,15|[0, 1, 15]|
|   0,1,16|[0, 1, 16]|
|   0,1,17|[0, 1, 17]|
|   0,1,18|[0, 1, 18]|
|   0,1,19|[0, 1, 19]|
|   0,1,20|[0, 1, 20]|
|   0,1,21|[0, 1, 21]|
+---------+----------+
only showing top 20 rows
  1. df.show() 只显示内容。

例如

df.show()
Out[11]: 
+----+-------+
| age|   name|
+----+-------+
|null|Michael|
|  30|   Andy|
|  19| Justin|
+----+-------+
  1. df.collect() 显示内容和 structure/metadata.e.g.

    df.collect()
    Out[11]:
    [Row(age=None, name=u'Michael'),
    Row(age=30, name=u'Andy'),
    Row(age=19, name=u'Justin')]
    
    1. df.take(一些数字)可用于显示内容,structure/metadata 可用于显示非常大的数据集的有限行数。请注意,它会将数据展平并显示在一行中。

例如仅查看数据框的前两行

df.take(2)
Out[13]: 
[Row(age=None, name=u'Michael'), Row(age=30, name=u'Andy')]