Pyspark ML - 随机森林分类器 - 一种热编码不适用于标签

Pyspark ML - Random forest classifier - One Hot Encoding not working for labels

我正在尝试 运行 使用 pyspark ml (spark 2.4.0) 的随机森林分类器,并使用 OHE 对目标标签进行编码。当我将标签作为整数(字符串索引器)提供时,模型训练良好,但当我使用 OneHotCodeEstimator 提供单热编码标签时,模型训练失败。这是火花限制吗?

#%%
# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
ohe = OneHotEncoderEstimator(inputCols=['label'],outputCols=['label_ohe'],dropLast=False)
label_pipeline=Pipeline(stages=[str_indxr,ohe])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])

data_fit = data_pipeline.fit(data_trans)

我收到这个错误:

  ---------------------------------------------------------------------------
IllegalArgumentException                  Traceback (most recent call last)
<ipython-input-18-f08a05d86e2c> in <module>()
      1 if(train_labdata_rf):
----> 2     pipeline_trained,accuracy,test_result_rf = train_test("rf",train_d,test_d)
      3     print("Test set accuracy = " + str(accuracy))
      4     #pipeline_trained.write().overwrite().save("/projects/projectwbvplatformpc/dev/PS-ET_Pipeline/CDM_Classifier/output/pyspark_classifier/pipelines/random_forest")
      5 else:

<ipython-input-4-9709037baa80> in train_test(modelname, train_data, test_data)
     11     """
     12     pipeline=create_pipeline(modelname)
---> 13     pipeline_fit = pipeline.fit(train_data)
     14 
     15     result = pipeline_fit.transform(test_d)

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
    107                     dataset = stage.transform(dataset)
    108                 else:  # must be an Estimator
--> 109                     model = stage.fit(dataset)
    110                     transformers.append(model)
    111                     if i < indexOfLastEstimator:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of param maps, "

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
    293 
    294     def _fit(self, dataset):
--> 295         java_model = self._fit_java(dataset)
    296         model = self._create_model(java_model)
    297         return self._copyValues(model)

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
    290         """
    291         self._transfer_params_to_java()
--> 292         return self._java_obj.fit(dataset._jdf)
    293 
    294     def _fit(self, dataset):

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/opt/cloudera/parcels/CDH-6.3.1-1.cdh6.3.1.p0.1470567/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     77                 raise QueryExecutionException(s.split(': ', 1)[1], stackTrace)
     78             if s.startswith('java.lang.IllegalArgumentException: '):
---> 79                 raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
     80             raise
     81     return deco

IllegalArgumentException: u'requirement failed: Column label_ohe must be of type numeric but was actually of type struct<type:tinyint,size:int,indices:array<int>,values:array<double>>.'

我找不到任何合适的资源。任何建议都会有所帮助。

编辑:pyspark 不支持向量作为目标标签,因此只能使用字符串编码。

有问题的代码是-

classifier = RandomForestClassifier(featuresCol='features', labelCol='label_ohe')

问题出在 labelCol= label_ohe 的类型上,它必须是 NumericType

的实例

OHE 的输出类型是 Vector

ref - spark-git

直接使用StringIndexer输出为-

# Test dataframe
import pyspark.sql.functions as F
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,LinearSVC
tst=sqlContext.createDataFrame([(1,'01/01/2020','buy',10000,2000),(1,'01/01/2020','sell',10000,3000),(1,'02/01/2020','buy',10000,1000),(1,'02/01/2020','sell',1000,2000),(2,'01/01/2020','sell',1000,3000),(2,'02/01/2020','buy',1000,1000),(2,'02/01/2020','buy',1000,100)],schema=("id","date","transaction","limit","amount"))
# label pipeleing
str_indxr = StringIndexer(inputCol='transaction', outputCol="label")
label_pipeline=Pipeline(stages=[str_indxr])
#%data data pipeleine
data_trans = label_pipeline.fit(tst).transform(tst)
vecAssembler = VectorAssembler(inputCols=["limit","amount"], outputCol="features",handleInvalid='skip')
classifier = RandomForestClassifier(featuresCol='features', labelCol='label')
data_pipeline = Pipeline(stages=[vecAssembler,classifier])

data_fit = data_pipeline.fit(data_trans)