Spark Scala 字符串匹配 UDF

a Spark Scala String Matching UDF

import org.apache.spark.sql.functions.lit

val containsString  = (haystack:String, needle:String) =>{
    if (haystack.contains(needle)){
        1
    }
    else{
         0
    }
}

val containsStringUDF = udf(containsString _)

val new_df = df.withColumn("nameContainsxyz", containsStringUDF($"name"),lit("xyz")))

我是 Spark scala 的新手。上面的代码似乎编译成功。但是,当我尝试 运行

new_df.groupBy("nameContainsxyz").sum().show()

错误抛出。有人可以帮帮我吗?错误信息如下。

Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string, string) => int)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
  at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
  at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
  at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:655)
  at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
  at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
  at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
  at org.apache.spark.scheduler.Task.run(Task.scala:121)
  at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
  at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
  at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
  ... 3 more
Caused by: java.lang.NullPointerException
  at $anonfun.apply(<console>:41)
  at $anonfun.apply(<console>:40)
  ... 15 more

只是一个更新:错误抛出,因为指定列中的某些行为空。在UDF中添加空检查完全解决了这个问题。

谢谢

如果我理解您正在尝试正确执行的操作,您想知道 'xyz' 出现在第 name 列中的行数?

您可以在不使用 UDF 的情况下做到这一点:

df.filter('name.contains("xyz")). count