Java + Apache Spark 两个数据集之间的内部连接

Java + Apache Spark inner join between two datesets

关于如何使用 Apache Spark 和 Java 实现内部连接的小问题。

我有一段非常简单的代码。

final Dataset<Row> dataSetTableLeft = getDatasetForLeft();
final Dataset<Row> dataSetTableRight = getDatasetForRight();
final Dataset<Row> dataSetTableResult = dataSetTableLeft.join(dataSetTableRight);

dataSetTableLeft.show();
dataSetTableRight.show();
dataSetTableResult.show();

第一个tabledataSetTableLeft.show ,长这样,很直白

+----------+-----+
|      time|label|
+----------+-----+
|1637020800|    0|
|1637107200|    0|
|1637193600|    0|
|1637280000|    0|
|1637366400|    0|
|1637452800|    0|
+----------+-----+"

第二个tabledataSetTableRight,看起来是这样的,也很直白

+----------+-----+
|      time|label|
+----------+-----+
|1637193600|    1|
|1637280000|    2|
|1637366400|    1|
+----------+-----+"

我想实现的是这样的结果table,这是一种内连接。

+----------+-----+
|      time|label|
+----------+-----+
|1637020800|    0|
|1637107200|    0|
|1637193600|    1|
|1637280000|    2|
|1637366400|    1|
|1637452800|    0|
+----------+-----+"

很遗憾,我没有看到任何类似的功能。 innerjoin()

因此,我正在尝试某种组合

dataSetTableLeft.unionAll(dataSetTableRight);
dataSetTableLeft.crossJoin(dataSetTableRight);

到目前为止没有任何运气。

实现内连接的正确方法是什么?

内部联接只会为您提供存在于两个数据集中的行。您在这里想要的是保留 dataSetTableLeft 中的所有行并在 time 匹配时从 dataSetTableRight 中检索 label 值。

为此,只需使用左连接和 coalesce 函数在匹配时从第二个数据集中获取 label 值。

这是包含所提供数据的完整工作示例:

import static org.apache.spark.sql.functions.*;

dataSetTableLeft.show();
//+----------+-----+
//|      time|label|
//+----------+-----+
//|1637020800|    0|
//|1637107200|    0|
//|1637193600|    0|
//|1637280000|    0|
//|1637366400|    0|
//|1637452800|    0|
//+----------+-----+

dataSetTableRight.show();
//+----------+-----+
//|      time|label|
//+----------+-----+
//|1637193600|    1|
//|1637280000|    2|
//|1637366400|    1|
//+----------+-----+


dataSetTableLeft.alias("df1").join(
        dataSetTableRight.alias("df2"),
        dataSetTableLeft.col("time").equalTo(dataSetTableRight.col("time")),
        "left_outer"
).select(
        dataSetTableLeft.col("time"),
        expr("coalesce(df2.label, df1.label)").alias("label")
).show();

//+----------+-----+
//|      time|label|
//+----------+-----+
//|1637020800|    0|
//|1637107200|    0|
//|1637193600|    1|
//|1637280000|    2|
//|1637366400|    1|
//|1637452800|    0|
//+----------+-----+