Spark 流减少多个键 Java

Spark streaming reduce by multiple key Java

我是 Spark Streaming 的新手,我一直在试图弄清楚如何处理这个问题,因为我发现了很多关于单个 (K,V) 对的示例,但还有更多。为了找到使用 Spark 转换 Java.

的最佳方法,我将不胜感激。

让我简要描述一下场景,

目标是获取一组元素在一段时间内的错误率window。

给定以下输入,

(A, Error)
(B, Success)
(B, Error)
(B, Success)
(C, Success)
(C, Error)

它将按元素聚合,然后按状态聚合 (Element, (Number of Success, Number of Error))。在这种情况下,转换的结果将是,

(A, (0,1))
(B, (2,1))
(C, (1,1))

最后使用 (i1,i2) -> i1/(i1+i2) 等函数进行比率计算。

(A, 100%)
(B, 33.3%)
(C, 50%)

据我了解,结果将由 reduceByKeyAndWindow() 函数给出,例如,

JavaPairDStream<String, Double> res = 
pairs.reduceByKeyAndWindow(reduceFunc, Durations.seconds(30), Durations.seconds(1));

随着应用程序的反向流动,我的问题是,

如何在 JavaPairDStream 上定义一对具有多个值或键(可能类似于 JavaPairDStream<String, Tuple2<Integer,Integer>>)的对?

给定一对多键的 reduceFunc 最好的方法是什么?

哪个是映射初始 DStream 的最佳方式(可能类似于 JavaDStream<Tuple2<String, String>> line = input.map(func))?

预先感谢您的帮助。

我已经找到解决办法了。使用函数 类 和元组,可以找到您将使用 Scala 构建的任何组合。问题是我在 Java 中没有找到与此相关的任何文档或示例。您将在下面找到我的解决方案,以防将来对任何人有所帮助。

JavaPairDStream<String,String> samples = lines.flatMapToPair(new PairFlatMapFunction<String,String, String>() {
            public Iterator<Tuple2<String,String>> call(String s) throws Exception {
                return Arrays.asList(new Tuple2<String, String>(//Some logic on my data//).iterator();
            }
        });


JavaPairDStream<Tuple2<String,String>, Integer> samplePairs = samples.mapToPair(
               new PairFunction<Tuple2<String,String>, Tuple2<String,String>, Integer>() {
                    public Tuple2<Tuple2<String,String>, Integer> call(Tuple2<String,String> t) {
                        return new Tuple2<Tuple2<String,String>, Integer>(t, 1);
                    }
                });

        JavaPairDStream<String, Integer> countErrors = samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
               return (t._1._2.equals("Error"));
           }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String, Integer> countSuccess= samplePairs.filter(new Function<Tuple2<Tuple2<String,String>,Integer>,Boolean>() {
            public Boolean call(Tuple2<Tuple2<String,String>, Integer> t)
            {
                return (t._1._2.equals("Success"));
            }}).mapToPair(new PairFunction<Tuple2<Tuple2<String,String>,Integer>, String, Integer>() {
            public Tuple2<String,Integer> call(Tuple2<Tuple2<String,String>,Integer> t) {
                return new Tuple2(t._1._1,t._2);
            }
        }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer i1, Integer i2) {
                return i1 + i2;
            }}, Durations.seconds(30), Durations.seconds(1));

        JavaPairDStream<String,Tuple2<Optional<Integer>,Optional<Integer>>> countPairs = countSuccess.fullOuterJoin(countErrors);

        JavaPairDStream<String, Double> mappedRDD = countPairs
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>>, String, Double>() {
                    public Tuple2<String, Double> call(Tuple2<String, Tuple2<Optional<Integer>, Optional<Integer>>> stringTuple2Tuple2) throws Exception {
                        if ((stringTuple2Tuple2._2()._2().isPresent()) && (stringTuple2Tuple2._2()._1().isPresent())) {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), ((double)stringTuple2Tuple2._2()._1().get() /
                                    ((double)stringTuple2Tuple2._2()._2().get()+(double)stringTuple2Tuple2._2()._1().get())));
                        } else if (stringTuple2Tuple2._2()._2().isPresent()){
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 1.0);
                        } else {
                            return new Tuple2<String, Double>(stringTuple2Tuple2._1(), 0.0);
                        }
                    }
                });