如何加入列与第二个数据框中的两列匹配的两个数据框?
How to join two dataframes where column matches with two columns in the second dataframe?
有 2 个数据帧。我只想从满足这些条件的第一个 DF 中 select 那些 devices
:
- 如果 PatternDS 在 Pat1 和 Pat2 上都有模式并且任何设备都匹配这些模式
- 如果在 PatternDS 上 Pat1 或 Pat2 为 NA,则任何设备都匹配对方的模式
我可以用一些 UDF 和一些循环来做到这一点,但我想用一些连接来做到这一点。任何提示表示赞赏。
设备DS:
| DeviceId | Pattern |
| -------- | ---------- |
| D1 | Dr_123_5.0 |
| D2 | Dr_456_6.0 |
| D3 | Ap_111_3.5 |
| D1 | Ap_333_4.5 |
| D2 | OE_222_7.7 |
| D4 | Dr_123_5.0 |
模式DS:
| Pat1 | Pat2 |
| --------------| -------------- |
|Dr_123_5.0 | Ap_333_4.5 |
|NA | OE_222_7.7 |
|Ap_111_3.5 | NA |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))
我从 DeviceDS
中得到两列,其中第一列是 DeviceId
,第二列是 Patterns
.
列表的 collect_set
现在我需要申请加入。
预期输出:
- 由于
D1
具有 Pat1
(Dr_123_5.0) 和 Pat2
(Ap_333_4.5) 匹配项,因此应将其包括在内
D2
有 Pat2
(OE_222_7.7) 并且该行的 Pat1
是 NA
,这应该包括在内
D3
有 Pat1
(Ap_111_3.5) 并且该行的 Pat2
是 NA
,这应该包括在内
D4
在第 1 行中有 Pat1
(Dr_123_5.0),但其中没有 Pat2
,因此这不符合条件。
所以最终结果是:
| DeviceId | Patterns |
| -------- | ---------- |
| D1 | array of Patterns |
| D2 | array of Patterns |
| D3 | array of Patterns |
注意 D4
不在此列表中,因为它不符合条件。 Patterns
包含匹配模式数组。
假设这些是您的输入数据帧:
val DevicesDS = Seq(
("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")
val PatternDS = Seq(
("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")
首先,按数据框分组 DeviceDS
以获取与每个 DeviceId
:
关联的模式列表
val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))
DevicesDSGrouped.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7, Dr_456_6.0]|
//|D4 |[Dr_123_5.0] |
//+--------+------------------------+
然后,在条件下使用 array_except
函数与 PatternDS
数据帧连接,以检查两个模式是否匹配或一个匹配另一个是 NA
。最后,按 DeviceId
分组并收集列 Pat1
和 Pat2
,如下所示:
val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0
val result = DevicesDSGrouped.join(PatternDS, joinCondition)
.groupBy("deviceId")
.agg(
array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
)
result.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7] |
//+--------+------------------------+
有 2 个数据帧。我只想从满足这些条件的第一个 DF 中 select 那些 devices
:
- 如果 PatternDS 在 Pat1 和 Pat2 上都有模式并且任何设备都匹配这些模式
- 如果在 PatternDS 上 Pat1 或 Pat2 为 NA,则任何设备都匹配对方的模式
我可以用一些 UDF 和一些循环来做到这一点,但我想用一些连接来做到这一点。任何提示表示赞赏。
设备DS:
| DeviceId | Pattern |
| -------- | ---------- |
| D1 | Dr_123_5.0 |
| D2 | Dr_456_6.0 |
| D3 | Ap_111_3.5 |
| D1 | Ap_333_4.5 |
| D2 | OE_222_7.7 |
| D4 | Dr_123_5.0 |
模式DS:
| Pat1 | Pat2 |
| --------------| -------------- |
|Dr_123_5.0 | Ap_333_4.5 |
|NA | OE_222_7.7 |
|Ap_111_3.5 | NA |
val result = DevicesDS.groupBy("deviceId","Pattern").count().groupBy("deviceId").agg(collect_set(struct("Pattern")).as("Pat"))
我从 DeviceDS
中得到两列,其中第一列是 DeviceId
,第二列是 Patterns
.
collect_set
现在我需要申请加入。
预期输出:
- 由于
D1
具有Pat1
(Dr_123_5.0) 和Pat2
(Ap_333_4.5) 匹配项,因此应将其包括在内 D2
有Pat2
(OE_222_7.7) 并且该行的Pat1
是NA
,这应该包括在内D3
有Pat1
(Ap_111_3.5) 并且该行的Pat2
是NA
,这应该包括在内D4
在第 1 行中有Pat1
(Dr_123_5.0),但其中没有Pat2
,因此这不符合条件。
所以最终结果是:
| DeviceId | Patterns |
| -------- | ---------- |
| D1 | array of Patterns |
| D2 | array of Patterns |
| D3 | array of Patterns |
注意 D4
不在此列表中,因为它不符合条件。 Patterns
包含匹配模式数组。
假设这些是您的输入数据帧:
val DevicesDS = Seq(
("D1", "Dr_123_5.0"), ("D2", "Dr_456_6.0"), ("D2", "OE_222_7.7"),
("D3", "Ap_111_3.5"), ("D1", "Ap_333_4.5"), ("D4", "Dr_123_5.0")
).toDF("DeviceId", "Pattern")
val PatternDS = Seq(
("Dr_123_5.0", "Ap_333_4.5"), ("NA", "OE_222_7.7"),("Ap_111_3.5", "NA")
).toDF("Pat1", "Pat2")
首先,按数据框分组 DeviceDS
以获取与每个 DeviceId
:
val DevicesDSGrouped = DevicesDS.groupBy("deviceId").agg(collect_set($"Pattern").as("Patterns"))
DevicesDSGrouped.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7, Dr_456_6.0]|
//|D4 |[Dr_123_5.0] |
//+--------+------------------------+
然后,在条件下使用 array_except
函数与 PatternDS
数据帧连接,以检查两个模式是否匹配或一个匹配另一个是 NA
。最后,按 DeviceId
分组并收集列 Pat1
和 Pat2
,如下所示:
val joinCondition = size(array_except(array_remove(array($"Pat1", $"Pat2"), "NA"), $"Patterns")) === 0
val result = DevicesDSGrouped.join(PatternDS, joinCondition)
.groupBy("deviceId")
.agg(
array_remove(flatten(collect_list(array($"Pat1", $"Pat2"))), "NA").as("Patterns")
)
result.show(false)
//+--------+------------------------+
//|deviceId|Patterns |
//+--------+------------------------+
//|D1 |[Dr_123_5.0, Ap_333_4.5]|
//|D3 |[Ap_111_3.5] |
//|D2 |[OE_222_7.7] |
//+--------+------------------------+