一列中的火花词不应出现在另一列中

Spark words in a column should not be present in another column

我有以下包含 2 列的数据框。

+---------+-----------------+
|column_a |column_b         |
+---------+-----------------+
|text book|Music Book Movie |
|book     |BOOK Film Theatre|
|note book|Music Movie Drama|
|rock     |Pop Metal Jazz   |
|hard rock|Blues Rap Rock   |
+---------+-----------------+

我必须过滤掉 column_a 中的任何 word/token 出现在 column_b 中的行。

例如: text book 应该被过滤掉,因为 textbook 出现在 column_b 第一行。

同样,hard rock 也应该被过滤掉,因为 hardrock 存在于 column_b.

第 2 行 book 也应该被过滤掉,因为单词 BOOK 出现在 column_b.

所以我的输出数据框是:

+---------+-----------------+
|column_a |column_b         |
+---------+-----------------+
|note book|Music Movie Drama|
|rock     |Pop Metal Jazz   |
+---------+-----------------+

我可以像这样基于文本值进行过滤 -

val columnA = "text book"
val columnB = "Music Book Movie"
val tokensColumnA = columnA.split("\s+").map(v => v.toLowerCase()).toSet
val tokensColumnB = columnB.split("\s+").map(v => v.toLowerCase()).toSet

val check: Boolean = if (tokensColumnA.intersect(tokensColumnB).size == 0) true  else false

但我不确定如何将其合并到 Spark 数据框中并进行相应的过滤。

import org.apache.spark.sql.functions.{col, udf}

import spark.implicits._

case class D(column_a: String, column_b: String)

val df1 = Seq(
  D("text book", "Music Book Movie"),
  D("book", "BOOK Film Theatre"),
  D("note book", "Music Movie Drama"),
  D("rock", "Pop Metal Jazz"),
  D("hard rock", "Blues Rap Rock")
).toDF()

df1.show(false)
//    +---------+-----------------+
//    |column_a |column_b         |
//    +---------+-----------------+
//    |text book|Music Book Movie |
//    |book     |BOOK Film Theatre|
//    |note book|Music Movie Drama|
//    |rock     |Pop Metal Jazz   |
//    |hard rock|Blues Rap Rock   |
//    +---------+-----------------+

val checkUDF = udf((columnA: String, columnB: String) => {
  val tokensColumnA = columnA.split("\s+").map(v => v.toLowerCase()).toSet
  val tokensColumnB = columnB.split("\s+").map(v => v.toLowerCase()).toSet
  if (tokensColumnA.intersect(tokensColumnB).size == 0) true  else false
})


val tmpDF = df1.withColumn("isCorect", checkUDF(col("column_a"), col("column_b")))
tmpDF.show(false)
//    +---------+-----------------+--------+
//    |column_a |column_b         |isCorect|
//    +---------+-----------------+--------+
//    |text book|Music Book Movie |false   |
//    |book     |BOOK Film Theatre|false   |
//    |note book|Music Movie Drama|true    |
//    |rock     |Pop Metal Jazz   |true    |
//    |hard rock|Blues Rap Rock   |false   |
//    +---------+-----------------+--------+
val resDF = tmpDF.filter(col("isCorect") === true)
resDF.show(false)
//    +---------+-----------------+--------+
//    |column_a |column_b         |isCorect|
//    +---------+-----------------+--------+
//    |note book|Music Movie Drama|true    |
//    |rock     |Pop Metal Jazz   |true    |
//    +---------+-----------------+--------+
val df = resDF.drop(col("isCorect"))
df.show(false)
//    +---------+-----------------+
//    |column_a |column_b         |
//    +---------+-----------------+
//    |note book|Music Movie Drama|
//    |rock     |Pop Metal Jazz   |
//    +---------+-----------------+