VectorAssembler 失败并显示 java.util.NoSuchElementException:Param handleInvalid 不存在

VectorAssembler fails with java.util.NoSuchElementException: Param handleInvalid does not exist

在转换使用 VectorAssembler 的 ML 管道时,它遇到了 "Param handleInvalid does not exist" 错误。为什么会这样?我错过了什么吗?我是 PySpark 的新手。

我按照代码使用它来将给定的列列表组合成单个向量列:

for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'response', outputCol = 'label')
stages += [label_stringIdx]

numericCols = ['date_acct_', 'date_loan_', 'amount', 'duration', 'payments', 'birth_number_', 'min1', 'max1', 'mean1', 'min2', 'max2', 'mean2', 'min3', 'max3', 'mean3', 'min4', 'max4', 'mean4', 'min5', 'max5', 'mean5', 'min6', 'max6', 'mean6', 'gen', 'has_card']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feature")
print(assembler)
stages += [assembler]

df_features 是我保留所有列的主要数据框。我试图将 handleInvalid = 'keep' 和 handleInvalid = 'skip' 保留在那里,但不幸的是得到了同样的错误。

出现以下错误:

Traceback (most recent call last):
  File "spark_model_exp_.py", line 275, in <module>
    feature_df = assembler.transform(features)
  File "/usr/local/lib/python3.6/site-packages/pyspark/ml/base.py", line 173, in transform
    return self._transform(dataset)
  File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 311, in _transform
    self._transfer_params_to_java()
  File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 124, in _transfer_params_to_java
    pair = self._make_java_param_pair(param, self._paramMap[param])
  File "/usr/local/lib/python3.6/site-packages/pyspark/ml/wrapper.py", line 113, in _make_java_param_pair
    java_param = self._java_obj.getParam(param.name)
  File "/usr/local/lib/python3.6/site-packages/py4j/java_gateway.py", line 1257, in __call__
    answer, self.gateway_client, self.target_id, self.name)
  File "/usr/local/lib/python3.6/site-packages/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/usr/local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o1072.getParam.
: java.util.NoSuchElementException: Param handleInvalid does not exist.
        at org.apache.spark.ml.param.Params$$anonfun$getParam.apply(params.scala:729)
        at org.apache.spark.ml.param.Params$$anonfun$getParam.apply(params.scala:729)
        at scala.Option.getOrElse(Option.scala:121)
        at org.apache.spark.ml.param.Params$class.getParam(params.scala:728)
        at org.apache.spark.ml.PipelineStage.getParam(Pipeline.scala:43)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

我之前尝试了什么?

categoricalColumns = ['frequency', 'type_disp', 'type_card']
for categoricalCol in categoricalColumns:
    stringIndexer = StringIndexer(inputCol = categoricalCol, outputCol = categoricalCol + 'Index').setHandleInvalid("keep")
    encoder = OneHotEncoderEstimator(inputCols=[stringIndexer.getOutputCol()], outputCols=[categoricalCol + "classVec"])
    stages += [stringIndexer, encoder]

label_stringIdx = StringIndexer(inputCol = 'response', outputCol = 'label')
stages += [label_stringIdx]
numericCols = ['date_acct_', 'date_loan_', 'amount', 'duration', 'payments', 'birth_number_', 'min1', 'max1', 'mean1', 'min2', 'max2', 'mean2', 'min3', 'max3', 'mean3', 'min4', 'max4', 'mean4', 'gen', 'has_card']
assemblerInputs = [c + "classVec" for c in categoricalColumns] + numericCols
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="feature")
stages += [assembler]
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(features)
features = pipelineModel.transform(features)
features.show(n=2)
selectedCols = ['label', 'feature'] + cols
features = features.select(selectedCols)
print(features.dtypes)

在上面的代码中,也通过使用 Pipeline,我在 Pipeline 的转换功能上遇到了错误。当我尝试上面的代码时,我没有在 VectorAssembler 转换函数中得到错误,并且在 Pipeline 转换函数中得到相同的错误(Param handleInvalid 不存在)。

请让我知道更多详情。我们可以尝试使用其他一些替代方案来实现这一目标吗?

编辑: 我得到了为什么会发生这种情况的部分答案,因为在本地 spark version= 2.4 上所以代码在这方面工作正常但 cluster spark version=2.3 并且因为 handleInvalid是从 2.4 版引入的,因此出现此错误。

但我想知道,因为我检查过数据帧中没有 NULL/NaN 值,但是 vectorAssembler 如何调用 handleInvalid 参数?我在想我是否可以绕过 handleInvalid 的这种隐式调用,这样我就不会遇到这个错误,或者是否有任何其他替代选项而不是将 spark 版本从 2.3 升级到 2.4?

有人可以对此提出建议吗?

我从 RFormula 得到了解决这个问题的最终解决方案,因此无需使用 StringIndexer、VectorAssembler 和 Pipeline。 RFormula 将在后台执行所有操作。 https://spark.apache.org/docs/2.3.0/api/java/org/apache/spark/ml/feature/RFormula.html

formula = RFormula(formula='response ~ .', featuresCol='features', labelCol='label')
label_df = formula.fit(df_features).transform(df_features)

其中响应是标签,df_features 是您的整套功能。