基于另一个数据帧从一个数据帧中的序列中删除字符串
Remove String from Sequence in one Dataframe based on another Dataframe
我有一个如下所示的数据框
stringTokenDF
+-------+------------------
|Id | Tokens
+-------+------------------
|1 |[A, B, C, D]
|1 |[B, C, D, G]
|1 |[A, D, E]
|1 |[B, C, F]
|2 |[A, C, D]
|2 |[C, E, F]
|2 |[A, C, D, H]
+-------+------------------
另一个dataframe如下图
leastFrequenctDf
+-------+------------------
|Id | LeastFrequentWords
+-------+------------------
|1 |[E, G]
|2 |[E, F, H]
现在我想从 stringTokenDF 的 Tokens 列中的 Sequence 中删除属于同一 ID 的 LeastFrequenctWords Sequence 中的字符串。我的输出应该如下所示。
+-------+------------------
|Id | Tokens
+-------+------------------
|1 |[A, B, C, D]
|1 |[B, C, D]
|1 |[A, D ]
|1 |[B, C, F]
|2 |[A, C, D]
|2 |[C ]
|2 |[A, C, D]
+-------+------------------
我尝试使用 Sequence 的 join 和 intersect,但它没有像上面那样给我正确的结果。
val intersectorUDF = udf((seq1: Seq[String], seq2: Seq[String]) => {
seq1.intersect(seq2)
} )
stringTokenDF.join(leastFrequenctDf, stringTokenDF("id") === leastFrequenctDf("id")).
withColumn("intersectedToken",intersectorUDF(stringTokenDF("Tokens"),
leastFrequenctDf("LeastFrequentWords"))
在 spark scala 中实现这个的正确方法是什么?
您可以连接两个 DataFrame 并应用 UDF
来计算两个序列列之间的 diff
:
val stringTokenDF = Seq(
(1, Seq("A", "B", "C", "D")),
(1, Seq("B", "C", "D", "G")),
(1, Seq("A", "D", "E")),
(1, Seq("B", "C", "F")),
(2, Seq("A", "C", "D")),
(2, Seq("C", "E", "F")),
(2, Seq("A", "C", "D", "H"))
).toDF("Id", "Tokens")
val leastFrequenctDf = Seq(
(1, Seq("E", "G")),
(2, Seq("E", "F", "H"))
).toDF("Id", "LeastFrequentWords")
def diff = udf( (s1: Seq[String], s2: Seq[String]) =>
s1 diff s2
)
stringTokenDF.join(leastFrequenctDf, Seq("Id")).
select($"Id", diff($"Tokens", $"LeastFrequentWords").as("Tokens")).
show
// +---+------------+
// | Id| Tokens|
// +---+------------+
// | 1|[A, B, C, D]|
// | 1| [B, C, D]|
// | 1| [A, D]|
// | 1| [B, C, F]|
// | 2| [A, C, D]|
// | 2| [C]|
// | 2| [A, C, D]|
// +---+------------+
我有一个如下所示的数据框
stringTokenDF
+-------+------------------
|Id | Tokens
+-------+------------------
|1 |[A, B, C, D]
|1 |[B, C, D, G]
|1 |[A, D, E]
|1 |[B, C, F]
|2 |[A, C, D]
|2 |[C, E, F]
|2 |[A, C, D, H]
+-------+------------------
另一个dataframe如下图
leastFrequenctDf
+-------+------------------
|Id | LeastFrequentWords
+-------+------------------
|1 |[E, G]
|2 |[E, F, H]
现在我想从 stringTokenDF 的 Tokens 列中的 Sequence 中删除属于同一 ID 的 LeastFrequenctWords Sequence 中的字符串。我的输出应该如下所示。
+-------+------------------
|Id | Tokens
+-------+------------------
|1 |[A, B, C, D]
|1 |[B, C, D]
|1 |[A, D ]
|1 |[B, C, F]
|2 |[A, C, D]
|2 |[C ]
|2 |[A, C, D]
+-------+------------------
我尝试使用 Sequence 的 join 和 intersect,但它没有像上面那样给我正确的结果。
val intersectorUDF = udf((seq1: Seq[String], seq2: Seq[String]) => {
seq1.intersect(seq2)
} )
stringTokenDF.join(leastFrequenctDf, stringTokenDF("id") === leastFrequenctDf("id")).
withColumn("intersectedToken",intersectorUDF(stringTokenDF("Tokens"),
leastFrequenctDf("LeastFrequentWords"))
在 spark scala 中实现这个的正确方法是什么?
您可以连接两个 DataFrame 并应用 UDF
来计算两个序列列之间的 diff
:
val stringTokenDF = Seq(
(1, Seq("A", "B", "C", "D")),
(1, Seq("B", "C", "D", "G")),
(1, Seq("A", "D", "E")),
(1, Seq("B", "C", "F")),
(2, Seq("A", "C", "D")),
(2, Seq("C", "E", "F")),
(2, Seq("A", "C", "D", "H"))
).toDF("Id", "Tokens")
val leastFrequenctDf = Seq(
(1, Seq("E", "G")),
(2, Seq("E", "F", "H"))
).toDF("Id", "LeastFrequentWords")
def diff = udf( (s1: Seq[String], s2: Seq[String]) =>
s1 diff s2
)
stringTokenDF.join(leastFrequenctDf, Seq("Id")).
select($"Id", diff($"Tokens", $"LeastFrequentWords").as("Tokens")).
show
// +---+------------+
// | Id| Tokens|
// +---+------------+
// | 1|[A, B, C, D]|
// | 1| [B, C, D]|
// | 1| [A, D]|
// | 1| [B, C, F]|
// | 2| [A, C, D]|
// | 2| [C]|
// | 2| [A, C, D]|
// +---+------------+