Java 中的 Spark 记录联动

Spark record linkage in Java

我需要根据某些字段的等价性或相似性来记录 link两个数据集的年龄。例如,假设数据集看起来像这样(带有一些随机数据):

A:

A_ID FirstName LastName BirthDate Address
0 Vera Williams 12.03.1999 Colorado, Greeley, 3774 Stark Hollow Road
1 Joseph Peters 11.10.1988 Florida, Deltona, 4577 Willis Avenue

B:

B_ID FullName BirthDate Street
37 Joseph Peters 11.10.1988 4577 Willis Avenue
49 Valerie J Porter 17.01.2000 2114 Center Street

B 的记录设置 'matches' 来自 [=41= 的记录时,我希望建立 link(ID 交换) ]A集.

假设:

两组中的

Joseph Peters 都符合所有这些规则。

我将如何使用 Spark 来实现?一个小例子将不胜感激,因为互联网上没有太多信息,它要么是 Scala 要么是 Python.

UPD:如果有人能在 Scala 中展示 - 好的,我也能理解 Scala,然后可能会翻译成 Java。

我没有用您的数据测试此代码,但希望它能正常工作。 在 Kotlin 上:

val datasetA: Dataset<Row> = ...
val datasetB: Dataset<Row> = ...

val condition = datasetA.col("BirthDate").equalTo(datasetB.col("BirthDate"))
        .and(datasetB.col("FullName").contains(datasetA.col("FirstName")))
        .and(datasetB.col("FullName").contains(datasetA.col("LastName")))
        .and(datasetB.col("Address").contains(datasetA.col("Street")))
val result = datasetA.join(featuresDF, condition)

您可以加​​入您的两个数据框。

最有效的方法是在数据帧 A 中创建一些列,以便仅使用列相等条件作为连接条件,这将防止 Spark 在连接两个数据帧时回退到非常低效的笛卡尔积.您可以按照以下步骤操作:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

import java.util.Arrays;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.concat_ws;
import static org.apache.spark.sql.functions.element_at;
import static org.apache.spark.sql.functions.split;

import scala.collection.JavaConverters;

...

Dataset<Row> outputDataframe = dataframeA
  .withColumn("FullName", concat_ws(" ", col("FirstName"), col("LastName")))
  .withColumn("Street", element_at(split(col("Address"), ", "), -1))
  .join(dataframeB, JavaConverters.asScalaBuffer(Arrays.asList("Street", "FullName", "BirthDate")), "left_outer")
  .drop("Street", "FullName");

使用您的示例数据框 A:

+----+---------+--------+----------+-----------------------------------------+
|A_ID|FirstName|lastName|BirthDate |Address                                  |
+----+---------+--------+----------+-----------------------------------------+
|0   |Vera     |Williams|12.03.1999|Colorado, Greeley, 3774 Stark Hollow Road|
|1   |Joseph   |Peters  |11.10.1988|Florida, Deltona, 4577 Willis Avenue     |
+----+---------+--------+----------+-----------------------------------------+

和数据框B

+----+----------------+----------+------------------+
|B_ID|FullName        |BirthDate |Street            |
+----+----------------+----------+------------------+
|37  |Joseph Peters   |11.10.1988|4577 Willis Avenue|
|49  |Valerie J Porter|17.01.2000|2114 Center Street|
+----+----------------+----------+------------------+

您将获得以下 output 数据框:

+----------+----+---------+--------+-----------------------------------------+----+
|BirthDate |A_ID|FirstName|lastName|Address                                  |B_ID|
+----------+----+---------+--------+-----------------------------------------+----+
|12.03.1999|0   |Vera     |Williams|Colorado, Greeley, 3774 Stark Hollow Road|null|
|11.10.1988|1   |Joseph   |Peters  |Florida, Deltona, 4577 Willis Avenue     |37  |
+----------+----+---------+--------+-----------------------------------------+----+

Note: If you can't easily extract exact matching data from dataframe A, you can go with . However, you may hit performance issues as Spark will perform a cartesian product.