如何找到基于多列的数据框的交集?

How to find intersection of dataframes based on multiple columns?

我有两个数据框如下。我试图根据两列中的任何一列找到两个数据框的交集,而不仅仅是这两列。

所以在这种情况下,我想 return 数据帧 C,它有 df A 行 1(如 A row1 col1= row one col1 in B),df A row 2(A row 2 Col 2 = B 中的第 1 行 Col2)和 df A 第 4 行(如 B 中的第 2 行 Col1 = A 中的第 1 行第 4 行)和 A 中的第 5 行。但是如果我做 A 和 B 的交叉,它只会 return A 中的第 5 行,因为这是两列的匹配项。我该怎么做呢?许多 thanks.Let 我知道我是否没有很好地解释这个问题。

甲:

     Col1    Col2 
     1         2    
     2         3
     3         7 
     5         4
     1         3   

乙:

    Col1    Col2 
     1         3    
     5         1

C:

          1         2    
          2         3
          5         4
          1         3    

您应该对每个连接列分别执行两个 join 操作,然后对两个结果数据帧执行 union

val dfA = List((1,2),(2,3),(3,7),(5,4),(1,3)).toDF("Col1", "Col2")
val dfB = List((1,3),(5,1)).toDF("Col1", "Col2")
val res1 = dfA.join(dfB, dfA.col("Col1")===dfB.col("Col1"))
val res2 = dfA.join(dfB, dfA.col("Col2")===dfB.col("Col2"))
val res = res1.union(res2)

具有以下数据:

val df1 = sc.parallelize(Seq(1->2, 2->3, 3->7, 5->4, 1->3)).toDF("col1", "col2")
val df2 = sc.parallelize(Seq(1->3, 5->1)).toDF("col1", "col2")

然后您可以使用 or 条件连接您的数据集:

val cols = df1.columns
df1.join(df2, cols.map(c => df1(c) === df2(c)).reduce(_ || _) )
   .select(cols.map(df1(_)) :_*)
   .distinct
   .show

+----+----+
|col1|col2|
+----+----+
|   2|   3|
|   1|   2|
|   1|   3|
|   5|   4|
+----+----+

连接条件是通用的,适用于任意数量的列。该代码将每一列映射到 df1 中的该列与 df2 cols.map(c => df1(c) === df2(c)) 中的同一列之间的相等性。 reduce 采用所有这些等式的逻辑或,这就是您想要的。 select 在那里,否则两个数据帧的列将被保留。在这里,我只是保留了 df1 中的那些。我还添加了一个独特的,以防几行 df2 匹配一行 df1,反之亦然。事实上,你可能会得到一个笛卡尔积。

请注意,此方法不需要驱动程序的任何集合,因此无论数据集的大小如何,它都可以工作。然而,如果 df2 足够小,可以收集到驱动程序并广播,您可以使用如下方法更快地获得结果:

// to each column name, we map the set of values in df2.
val valueMap = df2.rdd
    .flatMap(row => cols.map(name => name -> row.getAs[Any](name)))
    .distinct
    .groupByKey
    .mapValues(_.toSet)
    .collectAsMap

//we create a udf that looks up in valueMap
val filter = udf((name : String, value : Any) => 
                     valueMap(name).contains(value))

//Finally we apply the filter.
df1.where( cols.map(c => filter(lit(c), df1(c))).reduce(_||_))
   .show

使用这种方法,df1 没有改组,也没有笛卡尔积。如果 df2 很小,这绝对是要走的路。