如何加入列与第二个数据框中的两列匹配的两个数据框?

How to join two dataframes where column matches with two columns in the second dataframe?

有 2 个数据帧。我只想从满足这些条件的第一个 DF 中 select 那些 devices:

  1. 如果 PatternDS 在 Pat1 和 Pat2 上都有模式并且任何设备都匹配这些模式
  2. 如果在 PatternDS 上 Pat1 或 Pat2 为 NA,则任何设备都匹配对方的模式

我可以用一些 UDF 和一些循环来做到这一点,但我想用一些连接来做到这一点。任何提示表示赞赏。

设备DS:

| DeviceId | Pattern    |
| -------- | ---------- |
| D1       | Dr_123_5.0 |
| D2       | Dr_456_6.0 |
| D3       | Ap_111_3.5 |
| D1       | Ap_333_4.5 |
| D2       | OE_222_7.7 |
| D4       | Dr_123_5.0 |

模式DS:

|     Pat1      |     Pat2       |
| --------------| -------------- |
|Dr_123_5.0     | Ap_333_4.5     |
|NA             | OE_222_7.7     |
|Ap_111_3.5     | NA             |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))

我从 DeviceDS 中得到两列,其中第一列是 DeviceId,第二列是 Patterns.

列表的 collect_set

现在我需要申请加入。

预期输出:

  1. 由于 D1 具有 Pat1 (Dr_123_5.0) 和 Pat2 (Ap_333_4.5) 匹配项,因此应将其包括在内
  2. D2Pat2 (OE_222_7.7) 并且该行的 Pat1NA,这应该包括在内
  3. D3Pat1 (Ap_111_3.5) 并且该行的 Pat2NA,这应该包括在内
  4. D4 在第 1 行中有 Pat1 (Dr_123_5.0),但其中没有 Pat2,因此这不符合条件。

所以最终结果是:

| DeviceId | Patterns          |
| -------- | ----------        |
| D1       | array of Patterns |
| D2       | array of Patterns |
| D3       | array of Patterns |

注意 D4 不在此列表中,因为它不符合条件。 Patterns 包含匹配模式数组。

假设这些是您的输入数据帧:

val DevicesDS = Seq(
  ("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
  ("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")

val PatternDS = Seq(
  ("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")

首先,按数据框分组 DeviceDS 以获取与每个 DeviceId:

关联的模式列表
val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))

DevicesDSGrouped.show(false)
//+--------+------------------------+
//|deviceId|Patterns                |
//+--------+------------------------+
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7, Dr_456_6.0]|
//|D4      |[Dr_123_5.0]            |
//+--------+------------------------+

然后,在条件下使用 array_except 函数与 PatternDS 数据帧连接,以检查两个模式是否匹配或一个匹配另一个是 NA。最后,按 DeviceId 分组并收集列 Pat1Pat2,如下所示:

val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0

val result = DevicesDSGrouped.join(PatternDS, joinCondition)
  .groupBy("deviceId")
  .agg(
    array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
  )

result.show(false)
//+--------+------------------------+
//|deviceId|Patterns                |
//+--------+------------------------+
//|D1      |[Dr_123_5.0, Ap_333_4.5]|
//|D3      |[Ap_111_3.5]            |
//|D2      |[OE_222_7.7]            |
//+--------+------------------------+