无法在 Pyspark 中执行用户定义的函数 RegexTokenizer

Failed to execute user defined function RegexTokenizer in Pyspark

我正在尝试使用 Pyspark 使用数据中的文本特征执行文本分类。下面是我的文本预处理代码,代码无法执行用户定义的函数 RegexTokenizer。

    tokenizer = RegexTokenizer(inputCol = "text", outputCol = "words", pattern = "\W")
    add_stopwords = StopWordsRemover.loadDefaultStopWords("english")
    remover = StopWordsRemover(inputCol = "words", outputCol = "filtered").setStopWords(add_stopwords)
    label_stringIdx = StringIndexer(inputCol = "label", outputCol = "target")
    countVectors = CountVectorizer(inputCol="filtered", outputCol="features", vocabSize=1000, minDF=5)
    #pipleline for text pre-processing
    pipeline = Pipeline(stages=[tokenizer,remover, countVectors, label_stringIdx])

    #fit the dat for the pipeline
    pipelineFit = pipeline.fit(dataset)
    dataset = pipelineFit.transform(dataset)
    dataset.show()

错误是:

/usr/local/lib/python3.6/dist-packages/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o589.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 92.0 failed 1 times, most recent failure: Lost task 2.0 in stage 92.0 (TID 1317, 1e1a151fa0f5, executor driver): org.apache.spark.SparkException: Failed to execute user defined function(RegexTokenizer$$Lambda17/0x000000084123e040: (string) => array<string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.lang.NullPointerException
    at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc(Tokenizer.scala:146)
    ... 19 more

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2023)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage(DAGScheduler.scala:1972)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$adapted(DAGScheduler.scala:1971)
    at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
    at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1971)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$adapted(DAGScheduler.scala:950)
    at scala.Option.foreach(Option.scala:407)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:950)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2203)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2152)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2141)
    at org.apache.spark.util.EventLoop$$anon.run(EventLoop.scala:49)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:752)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2093)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2133)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2158)
    at org.apache.spark.rdd.RDD.count(RDD.scala:1227)
    at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:233)
    at org.apache.spark.ml.feature.CountVectorizer.fit(CountVectorizer.scala:149)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:566)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.spark.SparkException: Failed to execute user defined function(RegexTokenizer$$Lambda17/0x000000084123e040: (string) => array<string>)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon.hasNext(WholeStageCodegenExec.scala:729)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:458)
    at scala.collection.Iterator$$anon.hasNext(Iterator.scala:489)
    at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:192)
    at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:62)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:127)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run(Executor.scala:444)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1377)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:447)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    ... 1 more
Caused by: java.lang.NullPointerException
    at org.apache.spark.ml.feature.RegexTokenizer.$anonfun$createTransformFunc(Tokenizer.scala:146)
    ... 19 more

非常感谢任何帮助。

这个问题是因为您在输入列 text 中有 null 个值。

 val tokenizer = new RegexTokenizer()
      .setInputCol("text")
      .setOutputCol("words")
      .setPattern("\W")

    val inputDF = Seq("this is text", null).toDF("text")
    inputDF.show(false)

    /**
      * +------------+
      * |text        |
      * +------------+
      * |this is text|
      * |null        |
      * +------------+
      */

    // this fails
    tokenizer.transform(inputDF)
      .show(false)

    /**
      * Caused by: java.lang.NullPointerException
      * at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc.apply(Tokenizer.scala:143)
      * at org.apache.spark.ml.feature.RegexTokenizer$$anonfun$createTransformFunc.apply(Tokenizer.scala:141)
      */

删除空行或估算空列

    // replace nulls from text col to specified string as below
    val nullImputedDF = inputDF.na.fill("I am null imputed", Seq("text"))
    tokenizer.transform(nullImputedDF)
      .show(false)

    /**
      * +-----------------+----------------------+
      * |text             |words                 |
      * +-----------------+----------------------+
      * |this is text     |[this, is, text]      |
      * |I am null imputed|[i, am, null, imputed]|
      * +-----------------+----------------------+
      */