如何在 Spark 中按分区分组 key/values?
How to group key/values by partition in Spark?
我有一个 Spark Streaming 应用程序,它每秒接收几条 JSON 消息,每条消息都有一个标识其来源的 ID。
使用此 ID 作为键,我能够执行 MapPartitionsToPair
,从而创建一个 JavaPairDStream,具有 key/value 对的 RDD,每个分区一个键值对(所以如果我收到 5 JSON 条消息 例如,我得到一个包含 5 个分区的 RDD,每个分区都以消息的 ID 作为键,JSON 消息本身作为值。
我现在想做的是,我想将具有相同键的所有值分组到同一个分区中。因此,例如,如果我有 3 个带有键 'a' 的分区和 2 个带有键 'b' 的分区,我想创建一个带有 2 个分区而不是 5 个分区的新 RDD,每个分区包含一个键的所有值有,一个用于'a',一个用于'b'。
我怎样才能做到这一点?
到目前为止,这是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
//我创建了一个 JavaPairDStream,其中每个分区包含一个 key/value 对。
我尝试使用 grouByKey()
,但无论消息的数量是多少,我得到的分区号总是 2。
我应该怎么做?
非常感谢。
您可以使用
groupByKey(Integer numPartitions)
并将 numPartitions
设置为等于您拥有的不同键的数量。
但是..您需要预先知道您有多少个不同的键。你有那个信息吗?可能不会。那么 .. 你需要做一些额外的(/冗余的)工作。例如。使用
countByKey
作为第一步。这比 groupByKey 快 - 所以至少你没有 加倍 总处理时间。
更新 OP 询问为什么他们默认获得 2 个分区。
默认groupByKey
使用defaultPartitioner()
方法
groupByKey(defaultPartitioner(self))
- 从具有最大基数的父分区中选择
Partitioner
。
-- 否则它将使用 spark.default.parallelism
我有一个 Spark Streaming 应用程序,它每秒接收几条 JSON 消息,每条消息都有一个标识其来源的 ID。
使用此 ID 作为键,我能够执行 MapPartitionsToPair
,从而创建一个 JavaPairDStream,具有 key/value 对的 RDD,每个分区一个键值对(所以如果我收到 5 JSON 条消息 例如,我得到一个包含 5 个分区的 RDD,每个分区都以消息的 ID 作为键,JSON 消息本身作为值。
我现在想做的是,我想将具有相同键的所有值分组到同一个分区中。因此,例如,如果我有 3 个带有键 'a' 的分区和 2 个带有键 'b' 的分区,我想创建一个带有 2 个分区而不是 5 个分区的新 RDD,每个分区包含一个键的所有值有,一个用于'a',一个用于'b'。
我怎样才能做到这一点? 到目前为止,这是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
System.out.print(b._1+"_"+b._2);
// }
//break;
}
return a;
}
});
//我创建了一个 JavaPairDStream,其中每个分区包含一个 key/value 对。
我尝试使用 grouByKey()
,但无论消息的数量是多少,我得到的分区号总是 2。
我应该怎么做? 非常感谢。
您可以使用
groupByKey(Integer numPartitions)
并将 numPartitions
设置为等于您拥有的不同键的数量。
但是..您需要预先知道您有多少个不同的键。你有那个信息吗?可能不会。那么 .. 你需要做一些额外的(/冗余的)工作。例如。使用
countByKey
作为第一步。这比 groupByKey 快 - 所以至少你没有 加倍 总处理时间。
更新 OP 询问为什么他们默认获得 2 个分区。
默认groupByKey
使用defaultPartitioner()
方法
groupByKey(defaultPartitioner(self))
- 从具有最大基数的父分区中选择
Partitioner
。
-- 否则它将使用 spark.default.parallelism