在 Spark 中执行两次 groupbykey 的最佳实践?
Best practice to perform two times groupbykey in Spark?
我有很多这种格式的元组:
(1,200,a)
(2,300,a)
(1,300,b)
(2,400,a)
(2,500,b)
(3,200,a)
(3,400,b)
(1,500,a)
(2,400,b)
(3,500,a)
(1,200,b)
我的工作是第一次对第一个整数的元组进行排序,然后对元组的第三个元素的每个元素对元组的第二个元素的值进行平均。
所以,结果应该是这样的:
(1,350,a),
(1,250,b),
(2,350,a),
(2,450,b),
(3,350,a),
(3,400,b).
在这种情况下,您推荐哪种最佳做法?
我尝试对元组的第一个元素执行 MaptoPair 和 groupbykey。然后是第三个元素的另一个 MapTopPair 和 groupbykey,然后是 reducebykey,但它不起作用,我不知道为什么。我认为我没有使用最佳实践来解决此类工作。
This is a sketch of my solution
只需使用 Dataset
API。在 Scala 中,但 Java 几乎相同:
val rdd = sc.parallelize(Seq(
(1,200,"a"), (2,300,"a"), (1,300,"b"), (2,400,"a"), (2,500,"b"),
(3,200,"a"), (3,400,"b"), (1,500,"a"), (2,400,"b"), (3,500,"a"),
(1,200,"b")
))
val df = rdd.toDF("k1", "v", "k2")
df.groupBy("k1", "k2").mean("v").orderBy("k1", "k2").show
+---+---+------+
| k1| k2|avg(v)|
+---+---+------+
| 1| a| 350.0|
| 1| b| 250.0|
| 2| a| 350.0|
| 2| b| 450.0|
| 3| a| 350.0|
| 3| b| 400.0|
+---+---+------+
用RDD map先有复合键:
rdd
.map(x => ((x._1, x._3), (x._2, 1.0)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.mapValues(x => x._1 / x._2)
.take(6).foreach(println)
((2,a),350.0)
((3,b),400.0)
((1,b),250.0)
((1,a),350.0)
((3,a),350.0)
((2,b),450.0)
我有很多这种格式的元组:
(1,200,a)
(2,300,a)
(1,300,b)
(2,400,a)
(2,500,b)
(3,200,a)
(3,400,b)
(1,500,a)
(2,400,b)
(3,500,a)
(1,200,b)
我的工作是第一次对第一个整数的元组进行排序,然后对元组的第三个元素的每个元素对元组的第二个元素的值进行平均。 所以,结果应该是这样的:
(1,350,a),
(1,250,b),
(2,350,a),
(2,450,b),
(3,350,a),
(3,400,b).
在这种情况下,您推荐哪种最佳做法? 我尝试对元组的第一个元素执行 MaptoPair 和 groupbykey。然后是第三个元素的另一个 MapTopPair 和 groupbykey,然后是 reducebykey,但它不起作用,我不知道为什么。我认为我没有使用最佳实践来解决此类工作。
This is a sketch of my solution
只需使用 Dataset
API。在 Scala 中,但 Java 几乎相同:
val rdd = sc.parallelize(Seq(
(1,200,"a"), (2,300,"a"), (1,300,"b"), (2,400,"a"), (2,500,"b"),
(3,200,"a"), (3,400,"b"), (1,500,"a"), (2,400,"b"), (3,500,"a"),
(1,200,"b")
))
val df = rdd.toDF("k1", "v", "k2")
df.groupBy("k1", "k2").mean("v").orderBy("k1", "k2").show
+---+---+------+
| k1| k2|avg(v)|
+---+---+------+
| 1| a| 350.0|
| 1| b| 250.0|
| 2| a| 350.0|
| 2| b| 450.0|
| 3| a| 350.0|
| 3| b| 400.0|
+---+---+------+
用RDD map先有复合键:
rdd
.map(x => ((x._1, x._3), (x._2, 1.0)))
.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
.mapValues(x => x._1 / x._2)
.take(6).foreach(println)
((2,a),350.0)
((3,b),400.0)
((1,b),250.0)
((1,a),350.0)
((3,a),350.0)
((2,b),450.0)