如何使用另一个 Dataset<Row> 中的记录更新 Dataset<Row>,这些 Dataset<Row> 在 Spark 中具有与 JAVA API 相同的架构?

How Do you Update a Dataset<Row> with records from another Dataset<Row> which have Identical Schema in Spark with JAVA API?

假设您有一个包含以下记录的数据集 A:

Dataset A:
    {key1, val1}
    {key2, val2}
    {key3, val3}

Dataset B:
    {key4, val4}
    {key1, valBB}
    {key5, valN}
    {key2, NNNNN}

在 "Update" 发生后,最终数据集应该是这样的:

Dataset Final:
    {key1, valBB}
    {key2, NNNNN}
    {key3, val3}
    {key4, val4}
    {key5, valN}

到目前为止我采用的方法是将两个Dataset转换为JavaRDD,然后转换JavaRDD -> JavaPairRDD,然后firstPairRDD.subtractByKey(secondPairRDD)。这为我提供了数据集 A 中存在但数据集 B 中不存在的记录。然后我将其重新转换回数据集。下一步是我用 DatasetB 做一个 Union 来给我更新的数据集。对我来说,这并没有给我预期的结果。我采取了错误的方法吗?任何帮助将不胜感激。

我最终找到了更有效的解决方案:

    Dataset<Row> existsInAButNotB = A.join(B, A.col("key").equalTo(B.col("key") "left_anti");
    Dataset<Row> Final = existsInAButNotB.union(B); 

如果您有多个列用作键,那么您的解决方案应如下所示:

Dataset<Row> existsInAButNotB = A.join(B, A.col("key1").equalTo(B.col("key1").and(A.col("key2").equalTo(B.col("key2")) "left_anti");

这一行避免了用户进入低效的 RDD 世界并避免添加额外的代码。

看看这个:

更多关于 Left Anti Join 的信息:

数据集加入API: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Dataset.html#join(org.apache.spark.sql.Dataset,%20org.apache.spark.sql.Column,%20java.lang.String)