Spark管道误差梯度提升模型
Spark pipeline error gradient boosting model
我在 python 中使用梯度提升模型时遇到错误。我之前将数据归一化,使用 VectorAssemble 进行转换,并对列进行索引,当我 运行 this:
时发生错误
from pyspark.ml import Pipeline
#pipeline = Pipeline(stages=[gbt])
stages = []
stages += [gbt]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
prediction.printSchema()
这是错误:
command-3539065191562733> in <module>()
6
7 pipeline = Pipeline(stages=stages)
----> 8 model = pipeline.fit(df_train)
9 prediction = model.transform(df_train)
10 prediction.printSchema()
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
怎么了?我已经为此工作了一段时间,但不确定数据或代码有什么问题
我刚刚尝试了一个虚拟数据,没有任何测试拆分:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
tst= sqlContext.createDataFrame([('a',7,2,0),('b',3,4,1),('c',5,6,0),('d',7,8,1),('a',9,10,0),('a',11,12,1),('g',13,14,0)],schema=['col1','col2','col3','label'])
str_indxr = StringIndexer(inputCol='col1', outputCol="col1_indexed")
ohe = OneHotEncoderEstimator(inputCols=['col1_indexed'],outputCols=['col1_ohe'])
vec_assmblr = VectorAssembler(inputCols=['col1_ohe','col2','col3'],outputCol='features_norm')
gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)
pip_line = Pipeline(stages=[str_indxr,ohe,vec_assmblr,gbt])
pip_line_fit = pip_line.fit(tst)
#%%
df_tran = pip_line_fit.transform(tst)
这行得通。所以我可以想到两件事:
- 火花版。我使用 2.4.0。你的大于等于
这个?
- 对于其他阶段,例如 minmax scaler 或 vec assembler,你有没有
从 mlib 导入它?这种 ml 和 mlib 导入的混合导致
奇怪的问题。 mlib 将被逐步淘汰,因此请导入您的所有函数
来自 ml 库。
我在 python 中使用梯度提升模型时遇到错误。我之前将数据归一化,使用 VectorAssemble 进行转换,并对列进行索引,当我 运行 this:
时发生错误from pyspark.ml import Pipeline
#pipeline = Pipeline(stages=[gbt])
stages = []
stages += [gbt]
pipeline = Pipeline(stages=stages)
model = pipeline.fit(df_train)
prediction = model.transform(df_train)
prediction.printSchema()
这是错误:
command-3539065191562733> in <module>()
6
7 pipeline = Pipeline(stages=stages)
----> 8 model = pipeline.fit(df_train)
9 prediction = model.transform(df_train)
10 prediction.printSchema()
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/pipeline.py in _fit(self, dataset)
107 dataset = stage.transform(dataset)
108 else: # must be an Estimator
--> 109 model = stage.fit(dataset)
110 transformers.append(model)
111 if i < indexOfLastEstimator:
/databricks/spark/python/pyspark/ml/base.py in fit(self, dataset, params)
130 return self.copy(params)._fit(dataset)
131 else:
--> 132 return self._fit(dataset)
133 else:
134 raise ValueError("Params must be either a param map or a list/tuple of param maps, "
/databricks/spark/python/pyspark/ml/wrapper.py in _fit(self, dataset)
293
294 def _fit(self, dataset):
--> 295 java_model = self._fit_java(dataset)
296 model = self._create_model(java_model)
297 return self._copyValues(model)
/databricks/spark/python/pyspark/ml/wrapper.py in _fit_java(self, dataset)
290 """
291 self._transfer_params_to_java()
--> 292 return self._java_obj.fit(dataset._jdf)
293
294 def _fit(self, dataset):
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
1255 answer = self.gateway_client.send_command(command)
1256 return_value = get_return_value(
-> 1257 answer, self.gateway_client, self.target_id, self.name)
1258
1259 for temp_arg in temp_args:
/databricks/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
61 def deco(*a, **kw):
62 try:
---> 63 return f(*a, **kw)
64 except py4j.protocol.Py4JJavaError as e:
65 s = e.java_exception.toString()
/databricks/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
326 raise Py4JJavaError(
327 "An error occurred while calling {0}{1}{2}.\n".
--> 328 format(target_id, ".", name), value)
329 else:
330 raise Py4JError(
怎么了?我已经为此工作了一段时间,但不确定数据或代码有什么问题
我刚刚尝试了一个虚拟数据,没有任何测试拆分:
import pyspark.sql.functions as F
from pyspark.ml import Pipeline,PipelineModel
from pyspark.ml.classification import GBTClassifier
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StringIndexer,OneHotEncoderEstimator
tst= sqlContext.createDataFrame([('a',7,2,0),('b',3,4,1),('c',5,6,0),('d',7,8,1),('a',9,10,0),('a',11,12,1),('g',13,14,0)],schema=['col1','col2','col3','label'])
str_indxr = StringIndexer(inputCol='col1', outputCol="col1_indexed")
ohe = OneHotEncoderEstimator(inputCols=['col1_indexed'],outputCols=['col1_ohe'])
vec_assmblr = VectorAssembler(inputCols=['col1_ohe','col2','col3'],outputCol='features_norm')
gbt = GBTClassifier(labelCol="label", featuresCol="features_norm", maxIter=10)
pip_line = Pipeline(stages=[str_indxr,ohe,vec_assmblr,gbt])
pip_line_fit = pip_line.fit(tst)
#%%
df_tran = pip_line_fit.transform(tst)
这行得通。所以我可以想到两件事:
- 火花版。我使用 2.4.0。你的大于等于 这个?
- 对于其他阶段,例如 minmax scaler 或 vec assembler,你有没有 从 mlib 导入它?这种 ml 和 mlib 导入的混合导致 奇怪的问题。 mlib 将被逐步淘汰,因此请导入您的所有函数 来自 ml 库。