Spark 自定义预处理估算器
Spark custom preprocessing estimator
我想为 spark 的 Pipelines
编写自定义 Estimator
。它应该执行数据清理任务。这意味着将删除一些行,删除一些列,添加一些列,替换现有列中的一些值。 IT 还应该将某些数字列的平均值或最小值存储为 NaN 替换。
然而,
override def transformSchema(schema: StructType): StructType = {
schema.add(StructField("foo", IntegerType))
}
只支持添加字段?
我很好奇我应该如何处理这个。
StructField
api 仅支持添加字段,您是正确的。但是,这并不意味着您也不能删除字段!
StructType
有一个值成员 fields
,它给你一个 Array[StructField]
。您可以 .filter()
这个数组,但是您认为合适(通过 name
、dataType
或更复杂的方法),只保留您想要的列。
完成过滤后,您有两个选择:
- 为过滤后的
fields
数组中的每个新列添加一个 StructField
,并从中构造一个 StructType
- 从
fields
数组构造一个 StructType
并使用 .add(...)
添加新列。
我想为 spark 的 Pipelines
编写自定义 Estimator
。它应该执行数据清理任务。这意味着将删除一些行,删除一些列,添加一些列,替换现有列中的一些值。 IT 还应该将某些数字列的平均值或最小值存储为 NaN 替换。
然而,
override def transformSchema(schema: StructType): StructType = {
schema.add(StructField("foo", IntegerType))
}
只支持添加字段? 我很好奇我应该如何处理这个。
StructField
api 仅支持添加字段,您是正确的。但是,这并不意味着您也不能删除字段!
StructType
有一个值成员 fields
,它给你一个 Array[StructField]
。您可以 .filter()
这个数组,但是您认为合适(通过 name
、dataType
或更复杂的方法),只保留您想要的列。
完成过滤后,您有两个选择:
- 为过滤后的
fields
数组中的每个新列添加一个StructField
,并从中构造一个StructType
- 从
fields
数组构造一个StructType
并使用.add(...)
添加新列。