foreachRDD 在 Twitter 的 J8 Spark Streaming 中为每个 RDD 提取平均单词数和字符数 API

foreachRDD to pull average number of words & characters for each RDD in J8 Spark Streaming of Twitter API

我正在尝试使用 Java 8 中的 spark 从 Twitter API 中获取每个 RDD 中的平均单词和字符数。但是,我在使用流时遇到了问题为了达成这个。我的代码如下:

//Create the stream.
JavaReceiverInputDStream<Status> twitterStream = TwitterUtils.createStream(jssc);
//Outputs the text of tweets to a JavaDStream.
JavaDStream<String> statuses = twitterStream.map(Status::getText);
//Get the average number of words & characters in each RDD pulled during streaming.
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
        return avgWc, avgCc;});

我收到的错误是 foreachRDD 预期的 return 类型无效,而我的 return 是长格式。

我该如何解决这个问题?我需要用另一种方法来解决这个问题吗?

如果 return 类型为 void,则不可能 return 数据。 您可以在“foreachRDD”函数之外创建一个列表并传递值,如下所示:

List<Data> listData=new ArrayList();
statuses.foreachRDD(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            System.out.println(wc / c);
            System.out.println(cc / c);
            Data data=new Data();
            data.setAvgCc(avgCc);
            data.setAvgWc(avgWc);
            listData.add(data);
        });

数据是一个class,有两个变量avgCc和AvgWc,如下所示

public class Data {
    long avgWc;
    long avgCc;
    public long getAvgWc() {
        return avgWc;
    }
    public void setAvgWc(long avgWc) {
        this.avgWc = avgWc;
    }
    public long getAvgCc() {
        return avgCc;
    }
    public void setAvgCc(long avgCc) {
        this.avgCc = avgCc;
    }
    public Data(long avgWc, long avgCc) {
        super();
        this.avgWc = avgWc;
        this.avgCc = avgCc;
    }
    public Data() {
        super();
    }
}

如果有帮助,请告诉我。或者您需要更多说明。

一个可能的解决方案是使用 JavaDStream.transform。此函数允许留在 SparkStreaming-API:

JavaDStream<String> statuses = ...
JavaDStream<Tuple2<Long, Long>> avgs = statuses.transform(rdd -> {
            long c = rdd.count();
            long wc = rdd.map(s -> s.split(" ").length).reduce(Integer::sum);
            long cc = rdd.map(s -> s.split("").length).reduce(Integer::sum);
            long avgWc = wc / c;
            long avgCc = cc / c;
            //System.out.println(wc / c);
            //System.out.println(cc / c);
            return jssc.sparkContext().parallelize(Collections.singletonList(Tuple2.apply(avgWc, avgCc)));
        }
);
avgs.print();