如何最好地将 SparkSQL Dataframe Array[String] 列转换为新的 [String] 列
How to best transform a SparkSQL Dataframe Array[String] column to a new [String] column
我是 Spark 的新手,我有一个 Apache SparkSQL DataFrame df
有 4 列,具有以下架构:
root
|-- _id: string (nullable = false)
|-- _title: string (nullable = false)
|-- _published-at: date (nullable = false)
|-- p: array (nullable = true)
| |-- element: string (containsNull = true)
df
包含大量(大约一百万)篇新闻文章,每条记录的列包含:唯一 ID (_id)、标题 (_title)、发布日期 (_published-at) 和每篇文章 (p).paragraphs.
中文本的字符串数组
我现在想将 "p" 列从文章段落的当前格式 Array[String]
转换为完整文章文本的融合 String
,其中转换很简单映射其中段落元素与它们之间的 space (" ") 连接,导致新的第五 String
列添加到 df
。 IE。像这样:
df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))
这是行不通的。然而,这似乎是一个微不足道的问题,但我一定是出了什么问题。在Spark的functions
包中,可以找到很多函数,但是none似乎适合这里。我必须以某种方式使用 "User Defined Functions" (UDF) 吗?如果可能的话,最好避免它。
可以通过以下方式将其转换为 String
,从而产生新的 Dataset[String] dsFullArticles
:
dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")
(似乎需要 .as[Array[String]]
来解开 WrappedArray
,它实际上包装了 "p" 列中的每个 Array[String]
元素)。但是如何将 dsFullArticles
作为新的 列 附加到 df
?
之后我还要求出"fullarticle"栏中每篇文章的最长单词的长度,作为第六栏添加到df
:
// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match {
case x if x.isEmpty => 0
case x => x.max
}))
上面的代码同样有效,生成了一个 Dataset[int]
,但是如何类似地把它作为一个列添加到 df
中呢?同样的问题在这里。当全部都在同一个 DataFrame df
中时,可以很容易地进行各种 SQL 选择、过滤等
您可以使用concat_ws函数:
concat_ws(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by sep.
你的情况:
df.withColumn("fullarticle", concat_ws(" ",col("p")))
我是 Spark 的新手,我有一个 Apache SparkSQL DataFrame df
有 4 列,具有以下架构:
root
|-- _id: string (nullable = false)
|-- _title: string (nullable = false)
|-- _published-at: date (nullable = false)
|-- p: array (nullable = true)
| |-- element: string (containsNull = true)
df
包含大量(大约一百万)篇新闻文章,每条记录的列包含:唯一 ID (_id)、标题 (_title)、发布日期 (_published-at) 和每篇文章 (p).paragraphs.
我现在想将 "p" 列从文章段落的当前格式 Array[String]
转换为完整文章文本的融合 String
,其中转换很简单映射其中段落元素与它们之间的 space (" ") 连接,导致新的第五 String
列添加到 df
。 IE。像这样:
df.withColumn(df.(col"p").map(_.mkString(" ")).alias("fullarticle"))
这是行不通的。然而,这似乎是一个微不足道的问题,但我一定是出了什么问题。在Spark的functions
包中,可以找到很多函数,但是none似乎适合这里。我必须以某种方式使用 "User Defined Functions" (UDF) 吗?如果可能的话,最好避免它。
可以通过以下方式将其转换为 String
,从而产生新的 Dataset[String] dsFullArticles
:
dsFullArticles = df.select(col("p").as[Array[String]]).map(_.mkString(" ")).alias("fullarticle")
(似乎需要 .as[Array[String]]
来解开 WrappedArray
,它实际上包装了 "p" 列中的每个 Array[String]
元素)。但是如何将 dsFullArticles
作为新的 列 附加到 df
?
之后我还要求出"fullarticle"栏中每篇文章的最长单词的长度,作为第六栏添加到df
:
// Split each article in an array of its words
val dsFullArticlesArrayOfWords = dsFullArticles.map(s => s.split(" "))
// Find number of characters of longest word in article, 0 if article is empty
val dsMaxWordLength =
dsFullArticlesArrayOfWords.map(s => (s.map(w => w.length()) match {
case x if x.isEmpty => 0
case x => x.max
}))
上面的代码同样有效,生成了一个 Dataset[int]
,但是如何类似地把它作为一个列添加到 df
中呢?同样的问题在这里。当全部都在同一个 DataFrame df
中时,可以很容易地进行各种 SQL 选择、过滤等
您可以使用concat_ws函数:
concat_ws(sep, [str | array(str)]+) - Returns the concatenation of the strings separated by sep.
你的情况:
df.withColumn("fullarticle", concat_ws(" ",col("p")))