有效记录联动

Effective record linkage

我今天早些时候问过一个类似的问题。 是的。 很快:我需要为两个大型数据集(1.6M 和 6M)做记录链接。我打算使用 Sparks,认为我被警告的笛卡尔积不会是一个大问题。但它是。对性能影响很大,联动过程7个小时都没有完成..

还有其他 library/framework/tool 可以更有效地做到这一点吗?或者可以提高以下解决方案的性能?

我最终得到的代码:

    object App {
    
      def left(col: Column, n: Int) = {
        assert(n > 0)
        substring(col, 1, n)
      }
    
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .master("local[4]")
          .appName("MatchingApp")
          .getOrCreate()
    
        import spark.implicits._
    
        val a = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/a.csv")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))
    
        val b = spark.read
          .format("csv")
          .option("header", true)
          .option("delimiter", ";")
          .load("/home/helveticau/workstuff/b.txt")
          .withColumn("FULL_NAME", concat_ws(" ", col("FIRST_NAME"), col("LAST_NAME")))
          .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))
    
        // @formatter:off
        val condition = a
          .col("FULL_NAME").contains(b.col("FIRST_NAME"))
          .and(a.col("FULL_NAME").contains(b.col("LAST_NAME")))
          .and(a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
            .or(a.col("STREET").startsWith(left(b.col("STR"), 3))))
        // @formatter:on
        val startMillis = System.currentTimeMillis();
        val res = a.join(b, condition, "left_outer")
        val count = res
          .filter(col("B_ID").isNotNull)
          .count()
        println(s"Count: $count")
        val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
        println(s"Execution time: ${executionTime.toMinutes}m")
      }
    }

可能是条件太复杂了,不过应该是这样吧

您可以通过稍微更改执行链接的逻辑来提高当前解决方案的性能:

  • 首先对 ab 数据帧执行 内部联接 ,其中包含您知道匹配的列。在您的情况下,它似乎是 LAST_NAMEFIRST_NAME 列。
  • 然后过滤结果数据框与您的特定复杂条件,在您的情况下,出生日期相等或街道匹配条件。
  • 最后,如果您还需要保留未链接的记录,请对 a 数据框执行 右连接

您的代码可以重写如下:

import org.apache.spark.sql.functions.{col, substring, to_date}
import org.apache.spark.sql.SparkSession

import java.time.Duration

object App {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[4]")
      .appName("MatchingApp")
      .getOrCreate()

    val a = spark.read
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .load("/home/helveticau/workstuff/a.csv")
      .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "yyyy-MM-dd"))

    val b = spark.read
      .format("csv")
      .option("header", true)
      .option("delimiter", ";")
      .load("/home/helveticau/workstuff/b.txt")
      .withColumn("BIRTH_DATE", to_date(col("BIRTH_DATE"), "dd.MM.yyyy"))

    val condition = a.col("BIRTH_DATE").equalTo(b.col("BIRTH_DATE"))
      .or(a.col("STREET").startsWith(substring(b.col("STR"), 1, 3)))

    val startMillis = System.currentTimeMillis();
    val res = a.join(b, Seq("LAST_NAME", "FIRST_NAME"))
      .filter(condition)
      // two following lines optional if you want to only keep records with not null B_ID
      .select("B_ID", "A_ID")
      .join(a, Seq("A_ID"), "right_outer") 

    val count = res
      .filter(col("B_ID").isNotNull)
      .count()
    println(s"Count: $count")
    val executionTime = Duration.ofMillis(System.currentTimeMillis() - startMillis)
    println(s"Execution time: ${executionTime.toMinutes}m")
  }
}

因此,您将以两次连接而不是一次连接为代价避免笛卡尔积。

例子

文件 a.csv 包含以下数据:

"A_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STREET"
10;John;Doe;1965-10-21;Johnson Road
11;Rebecca;Davis;1977-02-27;Lincoln Road
12;Samantha;Johns;1954-03-31;Main Street
13;Roger;Penrose;1987-12-25;Oxford Street
14;Robert;Smith;1981-08-26;Canergie Road
15;Britney;Stark;1983-09-27;Alshire Road

并且b.txt具有以下数据:

"B_ID";"FIRST_NAME";"LAST_NAME";"BIRTH_DATE";"STR"
29;John;Doe;21.10.1965;Johnson Road
28;Rebecca;Davis;28.03.1986;Lincoln Road
27;Shirley;Iron;30.01.1956;Oak Street
26;Roger;Penrose;25.12.1987;York Street
25;Robert;Dayton;26.08.1956;Canergie Road
24;Britney;Stark;22.06.1962;Algon Road

res 数据框将是:

+----+----+----------+---------+----------+-------------+
|A_ID|B_ID|FIRST_NAME|LAST_NAME|BIRTH_DATE|STREET       |
+----+----+----------+---------+----------+-------------+
|10  |29  |John      |Doe      |1965-10-21|Johnson Road |
|11  |28  |Rebecca   |Davis    |1977-02-27|Lincoln Road |
|12  |null|Samantha  |Johns    |1954-03-31|Main Street  |
|13  |26  |Roger     |Penrose  |1987-12-25|Oxford Street|
|14  |null|Robert    |Smith    |1981-08-26|Canergie Road|
|15  |null|Britney   |Stark    |1983-09-27|Alshire Road |
+----+----+----------+---------+----------+-------------+

Note: if your FIRST_NAME and LAST_NAME columns are not exactly the same, you can try to make them matches with Spark's built-in functions, for instance:

  • trim to remove spaces at start and end of string
  • lower to transform the column to lower case (and thus ignore case in comparison)

What is really important is to have the maximum number of columns that exactly match.