Spark 自定义预处理估算器

Spark custom preprocessing estimator

我想为 spark 的 Pipelines 编写自定义 Estimator。它应该执行数据清理任务。这意味着将删除一些行,删除一些列,添加一些列,替换现有列中的一些值。 IT 还应该将某些数字列的平均值或最小值存储为 NaN 替换。

然而,

override def transformSchema(schema: StructType): StructType = {
   schema.add(StructField("foo", IntegerType))
}

只支持添加字段? 我很好奇我应该如何处理这个。

StructField api 仅支持添加字段,您是正确的。但是,这并不意味着您也不能删除字段!

StructType 有一个值成员 fields,它给你一个 Array[StructField]。您可以 .filter() 这个数组,但是您认为合适(通过 namedataType 或更复杂的方法),只保留您想要的列。

完成过滤后,您有两个选择:

  1. 为过滤后的 fields 数组中的每个新列添加一个 StructField,并从中构造一个 StructType
  2. fields 数组构造一个 StructType 并使用 .add(...) 添加新列。