如何在 Spark Streaming 中并行映射 key/value 个分区
How to map key/value partitions in paralell in Spark Streaming
我在本地模式下有一个 Spark Streaming 程序 运行,在该程序中,我从 TCP 套接字连接接收 JSON 消息,每个批次间隔接收多个消息。
这些消息中的每一个都有一个 ID,我用它来创建一个 key/value JavaPairDStream,这样在我的 JavaDStream 中的 RDD 的每个分区中,都有一个 key/value 对,一个每个分区一条消息。
我现在的目标是将具有相同 ID 的消息分组到同一个分区中,以便我可以并行映射它们,每个分区由不同的核心处理。
以下是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String>streamData1=streamData2.repartition(1);
JavaPairDStream<String,String> streamGiveKey= streamData1.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
}
return a;
}
});
因此,在这段代码的末尾,我有一个带有 RDD 的 DStream,由于 repartition{1}
,它只有一个分区,其中包含所有 key/value 对。
我现在应该如何将具有相同密钥的消息分组,并将它们放入不同的分区以便我可以单独映射它们?
根据 OP,这个问题是通过同时回答另一个问题来解决的:
我在本地模式下有一个 Spark Streaming 程序 运行,在该程序中,我从 TCP 套接字连接接收 JSON 消息,每个批次间隔接收多个消息。
这些消息中的每一个都有一个 ID,我用它来创建一个 key/value JavaPairDStream,这样在我的 JavaDStream 中的 RDD 的每个分区中,都有一个 key/value 对,一个每个分区一条消息。
我现在的目标是将具有相同 ID 的消息分组到同一个分区中,以便我可以并行映射它们,每个分区由不同的核心处理。
以下是我的代码:
JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String>streamData1=streamData2.repartition(1);
JavaPairDStream<String,String> streamGiveKey= streamData1.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
@Override
public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {
ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();
while (stringIterator.hasNext()){
String c=stringIterator.next();
if(c==null){
return null;
}
JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
String key= retMap.getSid();
Tuple2<String,String> b= new Tuple2<String,String>(key,c);
a.add(b);
}
return a;
}
});
因此,在这段代码的末尾,我有一个带有 RDD 的 DStream,由于 repartition{1}
,它只有一个分区,其中包含所有 key/value 对。
我现在应该如何将具有相同密钥的消息分组,并将它们放入不同的分区以便我可以单独映射它们?
根据 OP,这个问题是通过同时回答另一个问题来解决的: