PySpark 特征向量允许 NULL 值
PySpark feature vector to allow NULL values
我想在包含 NULL 值的数据集上使用 PySpark 中的分类器。 NULL 值出现在我创建的特征中,例如成功百分比。我需要保留 NULL 值,因为我已经通过 pandas 表明保留 NULL 值会导致更强大的模型。因此,我不想用零或中位数来估算 NULL。
我知道 Vector Assembler 可以用来创建特征向量,但是当数据包含 NULL 值时它不起作用。我想知道是否有一种方法可以创建一个包含 NULL 值的特征向量,它可以与 LightGBMClassifier 一起使用?
我用 diamonds.csv 数据演示了我遇到的问题。我使用一个干净的未经编辑的副本和一个插入空值的副本来演示我遇到的问题。
import pandas as pd
import numpy as np
import random
from mmlspark import LightGBMClassifier
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator
diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:]
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)
diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259|
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407|
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698|
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433|
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205|
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059|
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252|
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583|
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808|
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|
然后配置管道中使用的阶段。
categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
管道适合 diamondsData_clean 并且数据被转换,按预期返回标签列和特征向量。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
+-----+--------------------+
|price| features|
+-----+--------------------+
| 326|(23,[0,5,12,17,18...|
| 326|(23,[1,5,10,17,18...|
| 327|(23,[3,5,13,17,18...|
| 334|(23,[1,9,11,17,18...|
| 335|(23,[3,12,17,18,1...|
| 336|(23,[2,14,17,18,1...|
+-----+--------------------+
但是,当在 diamondsData_nulls 数据帧上尝试相同的步骤时,它 returns 出错。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (struct<cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double>) => vector)
这是一个正在处理的已知问题 (https://github.com/Azure/mmlspark/issues/304),但我目前找不到允许传递 NULL 的特征化器。
正在尝试使用 handleInvalid = "keep" 参数
用户 Machiel 建议 StringIndexer 和 OneHotEncoderEstimator 函数的 handleInvalid 参数 - 事实证明它也应该应用于 VectorAssembler 函数。我这样更新了我的代码:
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]
对于字符串和分类数字,您可以让 Spark 使用 handleInvalid 参数为缺失值创建一个桶:
OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')
我想在包含 NULL 值的数据集上使用 PySpark 中的分类器。 NULL 值出现在我创建的特征中,例如成功百分比。我需要保留 NULL 值,因为我已经通过 pandas 表明保留 NULL 值会导致更强大的模型。因此,我不想用零或中位数来估算 NULL。
我知道 Vector Assembler 可以用来创建特征向量,但是当数据包含 NULL 值时它不起作用。我想知道是否有一种方法可以创建一个包含 NULL 值的特征向量,它可以与 LightGBMClassifier 一起使用?
我用 diamonds.csv 数据演示了我遇到的问题。我使用一个干净的未经编辑的副本和一个插入空值的副本来演示我遇到的问题。
import pandas as pd
import numpy as np
import random
from mmlspark import LightGBMClassifier
from pyspark.sql.functions import *
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler, StringIndexer, OneHotEncoderEstimator
diamondsData = pd.read_csv("/dbfs/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv").iloc[:,1:]
diamondsData_clean = diamondsData.copy()
diamondsData_clean = spark.createDataFrame(diamondsData_clean)
diamondsData['randnum'] = diamondsData.apply(lambda x: random.uniform(0, 1), axis=1)
diamondsData['depth'] = diamondsData[['depth','randnum']].apply(lambda x: np.nan if x['randnum'] < 0.05 else x['depth'], axis=1)
diamondsData_nulls = spark.createDataFrame(diamondsData)
diamondsData_nulls = diamondsData_nulls.select([when(~isnan(c), col(c)).alias(c) if t in ("double", "float") else c for c, t in diamondsData_nulls.dtypes])
diamondsData_nulls.show(10)
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
|carat| cut|color|clarity|depth|table|price| x| y| z| randnum|
+-----+---------+-----+-------+-----+-----+-----+----+----+----+--------------------+
| 0.23| Ideal| E| SI2| 61.5| 55.0| 326|3.95|3.98|2.43| 0.0755707311804259|
| 0.21| Premium| E| SI1| 59.8| 61.0| 326|3.89|3.84|2.31| 0.9719186135587407|
| 0.23| Good| E| VS1| 56.9| 65.0| 327|4.05|4.07|2.31| 0.5237755344569698|
| 0.29| Premium| I| VS2| 62.4| 58.0| 334| 4.2|4.23|2.63| 0.12103842271165433|
| 0.31| Good| J| SI2| 63.3| 58.0| 335|4.34|4.35|2.75| 0.48213792315234205|
| 0.24|Very Good| J| VVS2| 62.8| 57.0| 336|3.94|3.96|2.48| 0.5461421401855059|
| 0.24|Very Good| I| VVS1| null| 57.0| 336|3.95|3.98|2.47|0.013923864248332252|
| 0.26|Very Good| H| SI1| 61.9| 55.0| 337|4.07|4.11|2.53| 0.551950501743583|
| 0.22| Fair| E| VS2| 65.1| 61.0| 337|3.87|3.78|2.49| 0.09444899320350808|
| 0.23|Very Good| H| VS1| 59.4| 61.0| 338| 4.0|4.05|2.39| 0.5246023480324566|
然后配置管道中使用的阶段。
categoricalColumns = ['cut', 'color', 'clarity']
stages = []
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index')
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features")
stages += [assembler]
管道适合 diamondsData_clean 并且数据被转换,按预期返回标签列和特征向量。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_clean)
diamonds_final = pipelineModel.transform(diamondsData_clean)
selectedCols = ['price', 'features']
diamonds_final = diamonds_final.select(selectedCols)
diamonds_final.printSchema()
diamonds_final.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
+-----+--------------------+
|price| features|
+-----+--------------------+
| 326|(23,[0,5,12,17,18...|
| 326|(23,[1,5,10,17,18...|
| 327|(23,[3,5,13,17,18...|
| 334|(23,[1,9,11,17,18...|
| 335|(23,[3,12,17,18,1...|
| 336|(23,[2,14,17,18,1...|
+-----+--------------------+
但是,当在 diamondsData_nulls 数据帧上尝试相同的步骤时,它 returns 出错。
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(diamondsData_nulls)
diamonds_final_nulls = pipelineModel.transform(diamondsData_nulls)
selectedCols = ['price', 'features']
diamonds_final_nulls = diamonds_final_nulls.select(selectedCols)
diamonds_final_nulls.printSchema()
diamonds_final_nulls.show(6)
root
|-- price: long (nullable = true)
|-- features: vector (nullable = true)
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 133952.0 failed 4 times, most recent failure: Lost task 0.3 in stage 133952.0 (TID 1847847, 10.139.64.4, executor 291): org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (struct<cutclassVec:vector,colorclassVec:vector,clarityclassVec:vector,carat:double,depth:double,table:double,x:double,y:double,z:double>) => vector)
这是一个正在处理的已知问题 (https://github.com/Azure/mmlspark/issues/304),但我目前找不到允许传递 NULL 的特征化器。
正在尝试使用 handleInvalid = "keep" 参数
用户 Machiel 建议 StringIndexer 和 OneHotEncoderEstimator 函数的 handleInvalid 参数 - 事实证明它也应该应用于 VectorAssembler 函数。我这样更新了我的代码:
for categoricalCol in categoricalColumns:
stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index',handleInvalid = "keep")
encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"],handleInvalid = "keep")
stages += [stringIndexer, encoder]
numericCols = ['carat','depth','table','x','y','z']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features",handleInvalid="keep")
stages += [assembler]
对于字符串和分类数字,您可以让 Spark 使用 handleInvalid 参数为缺失值创建一个桶:
OneHotEncoderEstimator(inputCols=..., outputCols=..., handleInvalid='keep')
StringIndexer(inputCol=..., outputCol=..., handleInvalid='keep')