Spark分组再排序(Java代码)
Spark grouping and then sorting (Java code)
我有一个 JavaPairRDD,需要按键分组,然后使用对象 MyObject 中的值对其进行排序。
假设 MyObject 是:
class MyObject {
Integer order;
String name;
}
示例数据:
1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}
2, {order:1, name:'Alfred'}
2, {order:3, name:'Ana'}
2, {order:2, name:'Jessica'}
3, {order:3, name:'Will'}
3, {order:2, name:'Mariah'}
3, {order:1, name:'Monika'}
预期结果:
分区 1:
1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}
分区 2
2, {order:1, name:'Alfred'}
2, {order:2, name:'Jessica'}
2, {order:3, name:'Ana'}
分区 3:
3, {order:1, name:'Monika'}
3, {order:2, name:'Mariah'}
3, {order:3, name:'Will'}
我正在使用密钥对 RDD 进行分区,然后使用 MyObject.order 对分区内的数据进行排序。
我的目标是只获取每个排序分区中的第 k 个元素,然后将它们减少到由其他 MyObject 属性(又名 "the first N best of the group")计算的值。
我该怎么做?
您可以使用 mapPartitions
:
JavaPairRDD<Long, MyObject> sortedRDD = rdd.groupBy(/* the first number */)
.mapPartitionsToPair(x -> {
List<Tuple2<Long, MyObject>> values = toArrayList(x);
Collections.sort(values, (x, y) -> x._2.order - y._2.order);
return values.iterator();
}, true);
两大亮点:
- toArrayList 采用迭代器和 returns ArrayList。必须自己实现
- 重要的是将 true 作为 mapPartitionsToPair 的第二个参数,因为它将保留分区
我有一个 JavaPairRDD,需要按键分组,然后使用对象 MyObject 中的值对其进行排序。
假设 MyObject 是:
class MyObject {
Integer order;
String name;
}
示例数据:
1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}
2, {order:1, name:'Alfred'}
2, {order:3, name:'Ana'}
2, {order:2, name:'Jessica'}
3, {order:3, name:'Will'}
3, {order:2, name:'Mariah'}
3, {order:1, name:'Monika'}
预期结果:
分区 1:
1, {order:1, name:'Joseph'}
1, {order:2, name:'Tom'}
1, {order:3, name:'Luke'}
分区 2
2, {order:1, name:'Alfred'}
2, {order:2, name:'Jessica'}
2, {order:3, name:'Ana'}
分区 3:
3, {order:1, name:'Monika'}
3, {order:2, name:'Mariah'}
3, {order:3, name:'Will'}
我正在使用密钥对 RDD 进行分区,然后使用 MyObject.order 对分区内的数据进行排序。
我的目标是只获取每个排序分区中的第 k 个元素,然后将它们减少到由其他 MyObject 属性(又名 "the first N best of the group")计算的值。
我该怎么做?
您可以使用 mapPartitions
:
JavaPairRDD<Long, MyObject> sortedRDD = rdd.groupBy(/* the first number */)
.mapPartitionsToPair(x -> {
List<Tuple2<Long, MyObject>> values = toArrayList(x);
Collections.sort(values, (x, y) -> x._2.order - y._2.order);
return values.iterator();
}, true);
两大亮点:
- toArrayList 采用迭代器和 returns ArrayList。必须自己实现
- 重要的是将 true 作为 mapPartitionsToPair 的第二个参数,因为它将保留分区