在 spark-scala 中进行 reduceByKey 转换后取回所有列

Get all the columns back after reduceByKey transformation in spark-scala

我是 Stack overflow 和 Spark.Basically 做 RDD 转换的新手。

我的输入数据:

278222631,2763985,10.02.12,01.01.53,Whatsup,NA,Email,Halter,wagen,28.06.12,313657794,VW,er,i,B,0,23.11.11,234
298106482,2780663,22.02.12,22.02.12,Whatsup,NA,WWW,Halter,wagen,26.06.12,284788860,VW,er,i,B,0,02.06.04,123

我的RDD格式

val dateCov: RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]

在 col (1,17) 上做一些 reduceBykey 转换 map([(k,k),(v)] 作为键,col(18) 作为值。并在 reduceByKey[ 上应用一些函数=19=]

示例:

val reducedSortedRDD = dateCov.map(r => { ((r._1, r._11) -> (r._18)) })
      .reduceByKey((x, y) => ((math.min(x, y)))) // find minimum diff
      .map(r => (r._1._1, r._1._2, r._2))
      .sortBy(_._1, true)
  1. 我的问题 - 在 reduceByKey 函数之后是否可以获取所有其他列,即我的 reducedSortedRDD return 类型应该是 reducedSortedRDD : RDD[(Long, Long, String, String, String, String, String, String, String, String, Long, String, String, String, String, String, String, Long)]

而不是本例中的 reducedSortedRDD: RDD[(Long, Long, Long)]

  1. 我做的对吗?我只想在 reduceByKey 转换
  2. 之后拥有一个完整的初始 RDD,而不是 RDD 的一个子集

我正在使用 spark 1.4

据我所知,您需要将所有列都放入 reduceByKey 函数中(请记住混洗额外数据的开销)或者您可以加入 reducedSortedRDD 与您的原始数据。

要同时显示所有列,您可以这样做:

val reducedSortedRDD = dateCov
  .map(r => ((r._1, r._11),(r._18, r._2, r._3, ..., r._17)))
  .reduceByKey((value1,value2) => if (value1._1 < value2._1) value1 else value2)
  .map{case(key, value) => (key._1, key._2, value._2, value._3, ..., value._17, value._1)}
  .sortBy(_._1, true)

要加入,它看起来像这样:

val keyValuedDateCov = dateCov
  .map(r => ((r._1, r._11, r._18), (r._1, r._2,r._3, ...., r._17)))

val reducedRDD = dateCov
  .map(r => ((r._1, r._11), r._18))
  .reduceByKey((x, y) => math.min(x, y)) // find minimum diff
  .map{case(key, value) => ((key._1, key._2, value), AnyRef)}

val reducedSortedRDD = reducedRDD
  .join(keyValuedDateCov)
  .map{case(key, (_, original)) => (key._1, key._2, original._1, original._2, original._3, ..., original._17, key._3)}
  .sortBy(_._1, true)

连接版本有一个弱点,如果原始数据中的多行在第 1、17 和 18 列中具有完全相同的值,那么最终结果也将包含具有这些值的多行,因此无法正确减少.如果保证数据不会出现多行这些列中的值相同,应该没有问题。