Apache Spark 使用 Java 加入示例

Apache Spark Joins example with Java

我是 Apache Spark 的新手。我实际上想专注于基本的 Spark API 规范,并想了解和使用 Spark API 编写一些程序。 我已经使用 Apache Spark 编写了一个 java 程序来实现连接概念。

当我使用 Left Outer Join -- leftOuterJoin() 或 Right Outer Join -- rightOuterJoin() 时,这两种方法都是 returning JavaPairRDD,其中包含特殊类型 Google 选项。但是我不知道如何从 Optional 类型中提取原始值。

无论如何,我想知道我是否可以使用与 return 我自己格式的数据相同的连接方法。我没有找到任何方法来做到这一点。意思是当我使用 Apache Spark 时,我无法以我自己的风格自定义代码,因为它们已经提供了所有预定义的东西。

请在下面找到代码

my 2 sample input datasets

customers_data.txt:
4000001,Kristina,Chung,55,Pilot
4000002,Paige,Chen,74,Teacher
4000003,Sherri,Melton,34,Firefighter

and

trasaction_data.txt
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit

这是我的 Java 代码

**SparkJoins.java:**

public class SparkJoins {

    @SuppressWarnings("serial")
    public static void main(String[] args) throws FileNotFoundException {
        JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local"));
        JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt");
        JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] customerSplit = s.split(",");
                return new Tuple2<String, String>(customerSplit[0], customerSplit[1]);
            }
        }).distinct();

        JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt");
        JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() {
            public Tuple2<String, String> call(String s) {
                String[] transactionSplit = s.split(",");
                return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]);
            }
        });

        //Default Join operation (Inner join)
        JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs);
        System.out.println("Joins function Output: "+joinsOutput.collect());

        //Left Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect());

        //Right Outer join operation
        JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey();
        System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect());

        sc.close();
    }
}

这是我得到的输出

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))]

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])]

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])]

我是运行这个程序在Windows平台

请观察上面的输出并帮助我从 Optional 类型中提取值

提前致谢

进行左外连接和右外连接时,可能会出现空值。对了!

所以 spark returns 可选对象。获得该结果后,您可以将该结果映射到您自己的格式。

您可以使用 Optional 的 isPresent() 方法来映射您的数据。

示例如下:

 JavaPairRDD<String,String> firstRDD = ....
 JavaPairRDD<String,String> secondRDD =....
 // join both rdd using left outerjoin
 JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD);


// mapping of join result
JavaPairRDD<String, String> mappedRDD = rddWithJoin
            .mapToPair(tuple -> {
                if (tuple._2()._2().isPresent()) {
                    //do your operation and return
                    return new Tuple2<String, String>(tuple._1(), tuple._2()._1());
                } else {
                    return new Tuple2<String, String>(tuple._1(), "not present");
                }
            });

在Java中,我们还可以使用DataFrames实现JOIN,如下所示:

1) 创建 spark 会话为:

SparkSession spark = SparkSession.builder().appName("JoinsInSpark").master("local").getOrCreate();

2) 我将员工输入作为:

101,Alan,Franklyn Street,Melbourne,QLD

104,Stuart,Lonsdale Street,Sydney,NSW

创建 DataFrame 为:

Dataset<Employee> e_data = spark
                        .read()
                        .textFile("C:/XX/XX/test.txt")
                        .map(line -> {
                            Employee e = new Employee();
                            String[] parts = line.split(",");
                            e.setE_id(Integer.valueOf(parts[0].trim()));
                            e.setE_name(parts[1].trim());
                            e.setAddress(parts[2].trim());
                            e.setCity(parts[3].trim());
                            e.setState(parts[4].trim());
                            return e;
                        }, Encoders.bean(Employee.class));

其中 Employee 是包含 setter、getter 以及构造函数的 POJO class。

3) 类似地为第二个 table 创建另一个 DF(比如薪水)

4) 对两个视图的不同元素应用内部联接:

Dataset<Row> d1 = e_data.distinct().join(s_data.distinct(), "e_id").orderBy("salary");

d1.show();

5) 相似,左外连接为:

spark.sql("select * from global_temp.employee e LEFT OUTER JOIN global_temp.salary s on e.e_id = s.e_id").show();