在 Spark 中访问广播变量 java
access Broadcast Variables in Spark java
我需要使用 Java RDD API 处理 spark 广播变量。这是我到目前为止尝试过的代码:
这只是检查其是否有效的示例代码?就我而言,我需要处理两个 csv
文件。
SparkConf conf = new SparkConf().setAppName("BroadcastVariable").setMaster("local");
JavaSparkContext ctx = new JavaSparkContext(conf);
Map<Integer,String> map = new HashMap<Integer,String>();
map.put(1, "aa");
map.put(2, "bb");
map.put(9, "ccc");
Broadcast<Map<Integer, String>> broadcastVar = ctx.broadcast(map);
List<Integer> list = new ArrayList<Integer>();
list.add(1);
list.add(2);
list.add(9);
JavaRDD<Integer> listrdd = ctx.parallelize(list);
JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value());
System.out.println(mapr.collect());
它打印输出如下:
[{1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}]
我的要求是:
[{aa, bb, ccc}]
是否可以按我要求的方式做?
我用JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value().get(x));
而不是 JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value());
.
现在可以使用了。
我需要使用 Java RDD API 处理 spark 广播变量。这是我到目前为止尝试过的代码:
这只是检查其是否有效的示例代码?就我而言,我需要处理两个 csv
文件。
SparkConf conf = new SparkConf().setAppName("BroadcastVariable").setMaster("local");
JavaSparkContext ctx = new JavaSparkContext(conf);
Map<Integer,String> map = new HashMap<Integer,String>();
map.put(1, "aa");
map.put(2, "bb");
map.put(9, "ccc");
Broadcast<Map<Integer, String>> broadcastVar = ctx.broadcast(map);
List<Integer> list = new ArrayList<Integer>();
list.add(1);
list.add(2);
list.add(9);
JavaRDD<Integer> listrdd = ctx.parallelize(list);
JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value());
System.out.println(mapr.collect());
它打印输出如下:
[{1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}, {1=aa, 2=bb, 9=ccc}]
我的要求是:
[{aa, bb, ccc}]
是否可以按我要求的方式做?
我用JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value().get(x));
而不是 JavaRDD<Object> mapr = listrdd.map(x -> broadcastVar.value());
.
现在可以使用了。