访问转换器 setInputCol() 方法中的嵌套列

Access nested columns in transformers setInputCol() method

我正在尝试使用 Databricks XML 解析器和 Spark 的管道方法来解析维基百科转储。目标是为嵌套列的文本字段计算特征向量。

XML的架构如下:

root
|-- id: long (nullable = true)
|-- ns: long (nullable = true)
|-- revision: struct (nullable = true)
|    |-- comment: string (nullable = true)
|    |-- contributor: struct (nullable = true)
|    |    |-- id: long (nullable = true)
|    |    |-- ip: string (nullable = true)
|    |    |-- username: string (nullable = true)
|    |-- format: string (nullable = true)
|    |-- id: long (nullable = true)
|    |-- minor: string (nullable = true)
|    |-- model: string (nullable = true)
|    |-- parentid: long (nullable = true)
|    |-- sha1: string (nullable = true)
|    |-- text: struct (nullable = true)
|    |    |-- _VALUE: string (nullable = true)
|    |    |-- _bytes: long (nullable = true)
|    |    |-- _space: string (nullable = true)
|    |-- timestamp: string (nullable = true)
|-- title: string (nullable = true)

在使用

读取转储后
val raw = spark.read.format("com.databricks.spark.xml").option("rowTag", "page").load("some.xml")

我可以使用

访问相应的文本
raw.select("revision.text._VALUE").show(10)

我在 Spark 管道中的第一个阶段是 RegexTokenizer,它需要访问 revision.text._VALUE 以转换数据:

val tokenizer = new RegexTokenizer().
    setInputCol("revision.text._VALUE").
    setOutputCol("tokens").
    setMinTokenLength(3).
    setPattern("\s+|\/|_|-").
    setToLowercase(true)
val pipeline = new Pipeline().setStages(Array(tokenizer))
val model = pipeline.fit(raw)

然而,这一步失败了:

Name: java.lang.IllegalArgumentException
Message: Field "revision.text._VALUE" does not exist.

关于如何在 setInputCol 方法中嵌套列有什么建议吗?

非常感谢!

尝试创建一个 temp 列,然后在 RegexTokenizer 中用作

val rawTemp = raw.withColumn("temp", $"revision.text._VALUE")

然后您可以使用 rawTemp 数据框和 RegexTokenizer 中的 temp 列作为

val tokenizer = new RegexTokenizer().
    setInputCol("temp").
    setOutputCol("tokens").
    setMinTokenLength(3).
    setPattern("\s+|\/|_|-").
    setToLowercase(true)
val pipeline = new Pipeline().setStages(Array(tokenizer))
val model = pipeline.fit(rawTemp)

希望回答对你有帮助