foreachRDD 在 Twitter 的 J8 Spark Streaming 中为每个 RDD 提取平均单词数和字符数 API
foreachRDD to pull average number of words & characters for each RDD in J8 Spark Streaming of Twitter API
我正在尝试使用 Java 8 中的 spark 从 Twitter API 中获取每个 RDD 中的平均单词和字符数。但是,我在使用流时遇到了问题为了达成这个。我的代码如下:
//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
return avgWc, avgCc;});
我收到的错误是 foreachRDD
预期的 return 类型无效,而我的 return 是长格式。
我该如何解决这个问题?我需要用另一种方法来解决这个问题吗?
如果 return 类型为 void,则不可能 return 数据。
您可以在“foreachRDD”函数之外创建一个列表并传递值,如下所示:
List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
Data data=new Data();
data.setAvgCc(avgCc);
data.setAvgWc(avgWc);
listData.add(data);
});
数据是一个class,有两个变量avgCc和AvgWc,如下所示
public class Data {
long avgWc;
long avgCc;
public long getAvgWc() {
return avgWc;
}
public void setAvgWc(long avgWc) {
this.avgWc = avgWc;
}
public long getAvgCc() {
return avgCc;
}
public void setAvgCc(long avgCc) {
this.avgCc = avgCc;
}
public Data(long avgWc, long avgCc) {
super();
this.avgWc = avgWc;
this.avgCc = avgCc;
}
public Data() {
super();
}
}
如果有帮助,请告诉我。或者您需要更多说明。
一个可能的解决方案是使用 JavaDStream.transform。此函数允许留在 SparkStreaming-API:
内
JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
//System.out.println(wc / c);
//System.out.println(cc / c);
return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
}
);
avgs.print();
我正在尝试使用 Java 8 中的 spark 从 Twitter API 中获取每个 RDD 中的平均单词和字符数。但是,我在使用流时遇到了问题为了达成这个。我的代码如下:
//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
return avgWc, avgCc;});
我收到的错误是 foreachRDD
预期的 return 类型无效,而我的 return 是长格式。
我该如何解决这个问题?我需要用另一种方法来解决这个问题吗?
如果 return 类型为 void,则不可能 return 数据。 您可以在“foreachRDD”函数之外创建一个列表并传递值,如下所示:
List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
System.out.println(wc / c);
System.out.println(cc / c);
Data data=new Data();
data.setAvgCc(avgCc);
data.setAvgWc(avgWc);
listData.add(data);
});
数据是一个class,有两个变量avgCc和AvgWc,如下所示
public class Data {
long avgWc;
long avgCc;
public long getAvgWc() {
return avgWc;
}
public void setAvgWc(long avgWc) {
this.avgWc = avgWc;
}
public long getAvgCc() {
return avgCc;
}
public void setAvgCc(long avgCc) {
this.avgCc = avgCc;
}
public Data(long avgWc, long avgCc) {
super();
this.avgWc = avgWc;
this.avgCc = avgCc;
}
public Data() {
super();
}
}
如果有帮助,请告诉我。或者您需要更多说明。
一个可能的解决方案是使用 JavaDStream.transform。此函数允许留在 SparkStreaming-API:
内JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
long c = rdd.count();
long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
long avgWc = wc / c;
long avgCc = cc / c;
//System.out.println(wc / c);
//System.out.println(cc / c);
return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
}
);
avgs.print();