[Spark SQL]:给定两个 DataFrame 并创建一个新 DataFrame 的查找功能

[Spark SQL]: Lookup functionality given two DataFrames and creating a new DataFrame

我正在使用 Scala 和 Spark 1.5。

给定两个 DataFrame DataFrame1DataFrame2,我想在 DataFrame2 中搜索值以获取 DataFrame1 中的键并创建 DataFrame3结果。该功能是独一无二的,因为 DataFrame1 在每一行中都有许多键,并且输出 DataFrame 的键和值应该以相同的顺序填充,如下面的输出 DataFrame 所示。如果可能的话,我正在寻找一个分布式解决方案,因为这个功能应该在数百万条记录(~1000 万条记录)上实现。有关如何进行的任何指示和有关有用方法的信息都会有很大帮助。提前致谢!

输入:DataFrame1(contract_id 最多关联 4 个客户)
contract_id, cust1_id, cust2_id, cust3_id, cust4_id
500001,100000001,100000002,100000003,100000004
500305,100000001,100000002,100000007
500303,100000021
500702,110000045
500304,100000021,100000051,120000051
503001,540000012,510000012,500000002,510000002
503051,880000045
输入:DataFrame2(客户主查询信息)
cust_id,date_of_birth
100000001,1988-11-04
100000002,1955-11-16
100000003,1980-04-14
100000004,1980-09-26
100000007,1942-03-07
100000021,1964-06-22
100000051,1920-03-12
120000051,1973-11-17
110000045,1955-11-16
880000045,1980-04-14
540000012,1980-09-26
510000012,1973-03-15
500000002,1958-08-18
510000002,1942-03-07

输出:DataFrame3

contract_id, cust1_id, cust2_id, cust3_id, cust4_id, cust1_dob, cust2_dob, cust3_dob, cust4_dob 
500001,100000001,100000002,100000003,100000004,1988-11-04,1955-11-16,1980-04-14,1980-09-26
500305,100000001,100000002,100000007,         ,1988-11-04,1955-11-16,1942-03-07
500303,100000021,         ,         ,         ,1964-06-22
500702,110000045          ,         ,         ,1955-11-16
500304,100000021,100000051,120000051,         ,1964-06-22,1920-03-12,1973-11-17
503001,540000012,510000012,500000002,510000002,1980-09-26,1973-03-15,1958-08-18,1942-03-07
503051,880000045          ,         ,         ,1980-04-14

这可能不是最有效的解决方案,但适合您的情况。

  import spark.implicits._

  val df1 = spark.sparkContext
    .parallelize(
      Seq(
        ("500001", "100000001", "100000002", "100000003", "100000004"),
        ("500305", "100000001", "100000002", "100000007", ""),
        ("500303", "100000021", "", "", ""),
        ("500702", "110000045", "", "", ""),
        ("500304", "100000021", "100000051", "120000051", ""),
        ("503001", "540000012", "510000012", "500000002", "510000002"),
        ("503051", "880000045", "", "", "")
      ))
    .toDF("contract_id", "cust1_id", "cust2_id", "cust3_id", "cust4_id")

  val df2 = spark.sparkContext
    .parallelize(
      Seq(
        ("100000001", "1988-11-04"),
        ("100000002", "1955-11-16"),
        ("100000003", "1980-04-14"),
        ("100000004", "1980-09-26"),
        ("100000007", "1942-03-07"),
        ("100000021", "1964-06-22"),
        ("100000051", "1920-03-12"),
        ("120000051", "1973-11-17"),
        ("110000045", "1955-11-16"),
        ("880000045", "1980-04-14"),
        ("540000012", "1980-09-26"),
        ("510000012", "1973-03-15"),
        ("500000002", "1958-08-18"),
        ("510000002", "1942-03-07")
      ))
    .toDF("cust_id", "date_of_birth")

  val finalDF = df1
    .join(df2, df1("cust1_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust1_dob")
    .join(df2, df1("cust2_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust2_dob")
    .join(df2, df1("cust3_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust3_dob")
    .join(df2, df1("cust4_id") === df2("cust_id"), "left")
    .drop("cust_id")
    .withColumnRenamed("date_of_birth", " cust4_dob")

  finalDF.na.fill("").show()