Spark Scala 字符串匹配 UDF
a Spark Scala String Matching UDF
import org.apache.spark.sql.functions.lit
val containsString = (haystack:String, needle:String) =>{
if (haystack.contains(needle)){
1
}
else{
0
}
}
val containsStringUDF = udf(containsString _)
val new_df = df.withColumn("nameContainsxyz", containsStringUDF($"name"),lit("xyz")))
我是 Spark scala 的新手。上面的代码似乎编译成功。但是,当我尝试 运行
new_df.groupBy("nameContainsxyz").sum().show()
错误抛出。有人可以帮帮我吗?错误信息如下。
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string, string) => int)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:655)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
... 3 more
Caused by: java.lang.NullPointerException
at $anonfun.apply(<console>:41)
at $anonfun.apply(<console>:40)
... 15 more
只是一个更新:错误抛出,因为指定列中的某些行为空。在UDF中添加空检查完全解决了这个问题。
谢谢
如果我理解您正在尝试正确执行的操作,您想知道 'xyz' 出现在第 name
列中的行数?
您可以在不使用 UDF 的情况下做到这一点:
df.filter('name.contains("xyz")). count
import org.apache.spark.sql.functions.lit
val containsString = (haystack:String, needle:String) =>{
if (haystack.contains(needle)){
1
}
else{
0
}
}
val containsStringUDF = udf(containsString _)
val new_df = df.withColumn("nameContainsxyz", containsStringUDF($"name"),lit("xyz")))
我是 Spark scala 的新手。上面的代码似乎编译成功。但是,当我尝试 运行
new_df.groupBy("nameContainsxyz").sum().show()
错误抛出。有人可以帮帮我吗?错误信息如下。
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (string, string) => int)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:655)
at scala.collection.Iterator$$anon.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
... 3 more
Caused by: java.lang.NullPointerException
at $anonfun.apply(<console>:41)
at $anonfun.apply(<console>:40)
... 15 more
只是一个更新:错误抛出,因为指定列中的某些行为空。在UDF中添加空检查完全解决了这个问题。
谢谢
如果我理解您正在尝试正确执行的操作,您想知道 'xyz' 出现在第 name
列中的行数?
您可以在不使用 UDF 的情况下做到这一点:
df.filter('name.contains("xyz")). count