在 Apache Spark 中为每行迭代添加作用域变量

Add scoped variable per row iteration in Apache Spark

我正在将多个 html 文件读取到 Spark 中的数据框中。 我正在使用自定义 udf

将 html 的元素转换为数据框中的列
val dataset = spark
  .sparkContext
  .wholeTextFiles(inputPath)
  .toDF("filepath", "filecontent")
  .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
  .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))

  ...

  def parseDocValue(cssSelectorQuery: String) = 
     udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())

效果很好,但是每个 withColumn 调用都会导致解析 html 字符串,这是多余的。

有没有一种方法(不使用查找表等)可以根据每行的 "filecontent" 列生成 1 个已解析的文档 (Jsoup.parse(html)) 并使所有 withColumn 在数据框中调用?

或者我什至不应该尝试使用 DataFrame 而只使用 RDD?

我可能会重写如下,一次性完成解析和选择,并将它们放在一个临时列中:

val dataset = spark
  .sparkContext
  .wholeTextFiles(inputPath)
  .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
  .withColumn("biz_name", col("temp")(0))
  .withColumn("biz_website", col("temp")(1))
  .drop("temp")

def parseDocValue(cssSelectorQueries: Array[String]) =
udf((html: String) => {
  val j = Jsoup.parse(html)
  cssSelectorQueries.map(query => j.select(query).text())})

所以最后的答案其实很简单:

只需映射行并在那里创建对象

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = {
    val domObject = document.select(cssSelectorQuery)

    val domValue = attr match {
      case Some(a) => domObject.attr(a)
      case None => domObject.text()
    }

    domValue match {
      case x if x == null || x.isEmpty => None
      case y => Some(y)
    }
  }

 val dataset = spark
      .sparkContext
      .wholeTextFiles(inputPath, minPartitions = 265) 
      .map {
        case (filepath, filecontent) => {
          implicit val document = Jsoup.parse(filecontent)

          val customDataJson = docJson(filecontent, customJsonRegex)


          DataEntry(
            biz_name = docValue(".biz-page-title"),
            biz_website = docValue(".biz-website a"),
            url = docValue("meta[property=og:url]", attr = Some("content")),
            ...
            filename = Some(fileName(filepath)),
            fileTimestamp = Some(fileTimestamp(filepath))
          )
        }
      }
      .toDS()