Pyspark ML - 随机森林分类器 - 一种热编码不适用于标签
Pyspark ML - Random forest classifier - One Hot Encoding not working for labels
我正在尝试 运行 使用 pyspark ml (spark 2.4.0) 的随机森林分类器,并使用 OHE 对目标标签进行编码。当我将标签作为整数(字符串索引器)提供时,模型训练良好,但当我使用 OneHotCodeEstimator 提供单热编码标签时,模型训练失败。这是火花限制吗?
#%%
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
ohe = OneHotEncoderEstimator(inputCols=['label'],outputCols=['label_ohe'],dropLast=False)
label_pipeline=Pipeline(stages=[str_indxr,ohe])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)
我收到这个错误:
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-18-f08a05d86e2c> in <module>()
1 if(train_labdata_rf):
----> 2 pipeline_trained,accuracy,test_result_rf = train_test("rf",train_d,test_d)
3 print("Test set accuracy = " + str(accuracy))
4 #pipeline_trained.write().overwrite().save("/projects/projectwbvplatformpc/dev/PS-ET_Pipeline/CDM_Classifier/output/pyspark_classifier/pipelines/random_forest")
5 else:
<ipython-input-4-9709037baa80> in train_test(modelname, train_data, test_data)
11 """
12 pipeline=create_pipeline(modelname)
---> 13 pipeline_fit = pipeline.fit(train_data)
14
15 result = pipeline_fit.transform(test_d)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
77 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
78 if s.startswith('java.lang.IllegalArgumentException: '):
---> 79 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
80 raise
81 return deco
IllegalArgumentException: u'requirement failed: Column label_ohe must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
我找不到任何合适的资源。任何建议都会有所帮助。
编辑:pyspark 不支持向量作为目标标签,因此只能使用字符串编码。
有问题的代码是-
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
问题出在 labelCol= label_ohe
的类型上,它必须是 NumericType
的实例
OHE 的输出类型是 Vector
ref - spark-git
直接使用StringIndexer输出为-
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
label_pipeline=Pipeline(stages=[str_indxr])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)
我正在尝试 运行 使用 pyspark ml (spark 2.4.0) 的随机森林分类器,并使用 OHE 对目标标签进行编码。当我将标签作为整数(字符串索引器)提供时,模型训练良好,但当我使用 OneHotCodeEstimator 提供单热编码标签时,模型训练失败。这是火花限制吗?
#%%
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
ohe = OneHotEncoderEstimator(inputCols=['label'],outputCols=['label_ohe'],dropLast=False)
label_pipeline=Pipeline(stages=[str_indxr,ohe])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)
我收到这个错误:
---------------------------------------------------------------------------
IllegalArgumentException Traceback (most recent call last)
<ipython-input-18-f08a05d86e2c> in <module>()
1 if(train_labdata_rf):
----> 2 pipeline_trained,accuracy,test_result_rf = train_test("rf",train_d,test_d)
3 print("Test set accuracy = " + str(accuracy))
4 #pipeline_trained.write().overwrite().save("/projects/projectwbvplatformpc/dev/PS-ET_Pipeline/CDM_Classifier/output/pyspark_classifier/pipelines/random_forest")
5 else:
<ipython-input-4-9709037baa80> in train_test(modelname, train_data, test_data)
11 """
12 pipeline=create_pipeline(modelname)
---> 13 pipeline_fit = pipeline.fit(train_data)
14
15 result = pipeline_fit.transform(test_d)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
77 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
78 if s.startswith('java.lang.IllegalArgumentException: '):
---> 79 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
80 raise
81 return deco
IllegalArgumentException: u'requirement failed: Column label_ohe must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'
我找不到任何合适的资源。任何建议都会有所帮助。
编辑:pyspark 不支持向量作为目标标签,因此只能使用字符串编码。
有问题的代码是-
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
问题出在 labelCol= label_ohe
的类型上,它必须是 NumericType
OHE 的输出类型是 Vector
ref - spark-git
直接使用StringIndexer输出为-
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
label_pipeline=Pipeline(stages=[str_indxr])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])
data_fit = data_pipeline.fit(data_trans)