将一个数据框中的各个值映射到另一个数据框中的值
Map individual values in one dataframe with values in another dataframe
我有一个包含两列的数据框 (DF1)
+-------+------+
|words |value |
+-------+------+
|ABC |1.0 |
|XYZ |2.0 |
|DEF |3.0 |
|GHI |4.0 |
+-------+------+
和另一个像这样的数据框 (DF2)
+-----------------------------+
|string |
+-----------------------------+
|ABC DEF GHI |
|XYZ ABC DEF |
+-----------------------------+
我必须将 DF2 中的各个字符串值替换为它们在 DF1 中的相应值。例如,在操作之后,我应该取回此数据帧。
+-----------------------------+
|stringToDouble |
+-----------------------------+
|1.0 3.0 4.0 |
|2.0 1.0 3.0 |
+-----------------------------+
我尝试了多种方法,但似乎无法找到解决方案。
def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
import spark.implicits._
def getIndex(word: String): Double = {
val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
val idx = idxRow.toString
if (!idx.isEmpty) idx.trim.toDouble else 1.0
}
conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
r =>
def row = {
val arr = r.getString(0).toLowerCase.split(" ")
val arrList = ArrayBuffer[Double]()
arr.map {
str =>
val index = getIndex(str)
}
Row.fromSeq(arrList.toSeq)
}
row
}
}
合并多个数据框以创建新列需要连接。通过查看您的两个数据框,似乎 我们可以通过 df1
的 words
列和 df2
的 string
列加入,但是 string
列需要一个 explode
和稍后的组合(这可以通过在爆炸前为每一行提供唯一的 ID 来完成)。 monotically_increasing_id
为 df2
中的每一行 提供唯一 ID。 split
函数将 string
列转换为数组以进行爆炸 。然后你可以 join
他们。然后剩下的步骤是 通过执行 groupBy
和 聚合 将展开的行组合回原始 。
最后,可以使用 udf
函数
将收集的数组列更改为所需的字符串列
长话短说,以下解决方案应该适合您
import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))
df2.withColumn("rowId", monotonically_increasing_id())
.withColumn("string", explode(split(col("string"), " ")))
.join(df1, col("string") === col("words"))
.groupBy("rowId")
.agg(collect_list("value").as("stringToDouble"))
.select(arrayToString(col("stringToDouble")).as("stringToDouble"))
哪个应该给你
+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0 |
|2.0 1.0 3.0 |
+--------------+
我有一个包含两列的数据框 (DF1)
+-------+------+ |words |value | +-------+------+ |ABC |1.0 | |XYZ |2.0 | |DEF |3.0 | |GHI |4.0 | +-------+------+
和另一个像这样的数据框 (DF2)
+-----------------------------+ |string | +-----------------------------+ |ABC DEF GHI | |XYZ ABC DEF | +-----------------------------+
我必须将 DF2 中的各个字符串值替换为它们在 DF1 中的相应值。例如,在操作之后,我应该取回此数据帧。
+-----------------------------+ |stringToDouble | +-----------------------------+ |1.0 3.0 4.0 | |2.0 1.0 3.0 | +-----------------------------+
我尝试了多种方法,但似乎无法找到解决方案。
def createCorpus(conversationCorpus: Dataset[Row], dataDictionary: Dataset[Row]): Unit = {
import spark.implicits._
def getIndex(word: String): Double = {
val idxRow = dataDictionary.selectExpr("index").where('words.like(word))
val idx = idxRow.toString
if (!idx.isEmpty) idx.trim.toDouble else 1.0
}
conversationCorpus.map { //eclipse doesnt like this map here.. throws an error..
r =>
def row = {
val arr = r.getString(0).toLowerCase.split(" ")
val arrList = ArrayBuffer[Double]()
arr.map {
str =>
val index = getIndex(str)
}
Row.fromSeq(arrList.toSeq)
}
row
}
}
合并多个数据框以创建新列需要连接。通过查看您的两个数据框,似乎 我们可以通过 df1
的 words
列和 df2
的 string
列加入,但是 string
列需要一个 explode
和稍后的组合(这可以通过在爆炸前为每一行提供唯一的 ID 来完成)。 monotically_increasing_id
为 df2
中的每一行 提供唯一 ID。 split
函数将 string
列转换为数组以进行爆炸 。然后你可以 join
他们。然后剩下的步骤是 通过执行 groupBy
和 聚合 将展开的行组合回原始 。
最后,可以使用 udf
函数
长话短说,以下解决方案应该适合您
import org.apache.spark.sql.functions._
def arrayToString = udf((array: Seq[Double])=> array.mkString(" "))
df2.withColumn("rowId", monotonically_increasing_id())
.withColumn("string", explode(split(col("string"), " ")))
.join(df1, col("string") === col("words"))
.groupBy("rowId")
.agg(collect_list("value").as("stringToDouble"))
.select(arrayToString(col("stringToDouble")).as("stringToDouble"))
哪个应该给你
+--------------+
|stringToDouble|
+--------------+
|1.0 3.0 4.0 |
|2.0 1.0 3.0 |
+--------------+