在 Apache Spark 中为每行迭代添加作用域变量
Add scoped variable per row iteration in Apache Spark
我正在将多个 html 文件读取到 Spark 中的数据框中。
我正在使用自定义 udf
将 html 的元素转换为数据框中的列
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.toDF("filepath", "filecontent")
.withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
.withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))
...
def parseDocValue(cssSelectorQuery: String) =
udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())
效果很好,但是每个 withColumn
调用都会导致解析 html 字符串,这是多余的。
有没有一种方法(不使用查找表等)可以根据每行的 "filecontent" 列生成 1 个已解析的文档 (Jsoup.parse(html)
) 并使所有 withColumn
在数据框中调用?
或者我什至不应该尝试使用 DataFrame 而只使用 RDD?
我可能会重写如下,一次性完成解析和选择,并将它们放在一个临时列中:
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
.withColumn("biz_name", col("temp")(0))
.withColumn("biz_website", col("temp")(1))
.drop("temp")
def parseDocValue(cssSelectorQueries: Array[String]) =
udf((html: String) => {
val j = Jsoup.parse(html)
cssSelectorQueries.map(query => j.select(query).text())})
所以最后的答案其实很简单:
只需映射行并在那里创建对象
def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = {
val domObject = document.select(cssSelectorQuery)
val domValue = attr match {
case Some(a) => domObject.attr(a)
case None => domObject.text()
}
domValue match {
case x if x == null || x.isEmpty => None
case y => Some(y)
}
}
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath, minPartitions = 265)
.map {
case (filepath, filecontent) => {
implicit val document = Jsoup.parse(filecontent)
val customDataJson = docJson(filecontent, customJsonRegex)
DataEntry(
biz_name = docValue(".biz-page-title"),
biz_website = docValue(".biz-website a"),
url = docValue("meta[property=og:url]", attr = Some("content")),
...
filename = Some(fileName(filepath)),
fileTimestamp = Some(fileTimestamp(filepath))
)
}
}
.toDS()
我正在将多个 html 文件读取到 Spark 中的数据框中。 我正在使用自定义 udf
将 html 的元素转换为数据框中的列val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.toDF("filepath", "filecontent")
.withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
.withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))
...
def parseDocValue(cssSelectorQuery: String) =
udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())
效果很好,但是每个 withColumn
调用都会导致解析 html 字符串,这是多余的。
有没有一种方法(不使用查找表等)可以根据每行的 "filecontent" 列生成 1 个已解析的文档 (Jsoup.parse(html)
) 并使所有 withColumn
在数据框中调用?
或者我什至不应该尝试使用 DataFrame 而只使用 RDD?
我可能会重写如下,一次性完成解析和选择,并将它们放在一个临时列中:
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
.withColumn("biz_name", col("temp")(0))
.withColumn("biz_website", col("temp")(1))
.drop("temp")
def parseDocValue(cssSelectorQueries: Array[String]) =
udf((html: String) => {
val j = Jsoup.parse(html)
cssSelectorQueries.map(query => j.select(query).text())})
所以最后的答案其实很简单:
只需映射行并在那里创建对象
def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = {
val domObject = document.select(cssSelectorQuery)
val domValue = attr match {
case Some(a) => domObject.attr(a)
case None => domObject.text()
}
domValue match {
case x if x == null || x.isEmpty => None
case y => Some(y)
}
}
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath, minPartitions = 265)
.map {
case (filepath, filecontent) => {
implicit val document = Jsoup.parse(filecontent)
val customDataJson = docJson(filecontent, customJsonRegex)
DataEntry(
biz_name = docValue(".biz-page-title"),
biz_website = docValue(".biz-website a"),
url = docValue("meta[property=og:url]", attr = Some("content")),
...
filename = Some(fileName(filepath)),
fileTimestamp = Some(fileTimestamp(filepath))
)
}
}
.toDS()