Spark - java.lang.ClassCastException 将 Array[Array[Map[String,String]] 类型的列处理成 udf 时
Spark - java.lang.ClassCastException when processing into a udf a column of type Array[Array[Map[String,String]]]
我在 Array[Map[String,String]]
类型的 spark 中连接两列,生成一个 Array[Array[Map[String,String]]]
类型的新列。但是,我想将该列展平,最终得到一个 Array[Map[String,String]]
类型的列,其中两个原始列的值都为
我从 Spark 2.4 了解到,可以直接在列的串联上应用 flatten
。像这样:
df.withColumn("concatenation", flatten(array($"colArrayMap1", $"colArrayMap2")))
但是我仍然使用 Spark 2.2,所以我需要为此使用 udf。这是我写的:
def flatten_collection(arr: Array[Array[Map[String,String]]]) = {
if(arr == null)
null
else
arr.flatten
}
val flatten_collection_udf = udf(flatten_collection _)
df.withColumn("concatenation", array($"colArrayMap1", $"colArrayMap2")).withColumn("concatenation", flatten_collection_udf($"concatenation")).show(false)
但我收到以下错误:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (array<array<map<string,string>>>) => array<map<string,string>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:835)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:835)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[Lscala.collection.immutable.Map;
我假设转换错误发生在 udf 中,但为什么以及如何避免它?
此外,如果有人知道不需要使用 UDF 的 Spark 2.2 解决方案,那就更好了
改编自答案 。需要 Seq
而不是 Array
。
def concat_arr(
arr1: Seq[Map[String,String]],
arr2: Seq[Map[String,String]]
) : Seq[Map[String,String]] =
{
(arr1 ++ arr2)
}
val concatUDF = udf(concat_arr _)
val df2 = df.withColumn("concatenation", concatUDF($"colArrayMap1", $"colArrayMap2"))
df2.show(false)
+--------------------+--------------------+----------------------------------------+
|colArrayMap1 |colArrayMap2 |concatenation |
+--------------------+--------------------+----------------------------------------+
|[[a -> b], [c -> d]]|[[a -> b], [c -> d]]|[[a -> b], [c -> d], [a -> b], [c -> d]]|
+--------------------+--------------------+----------------------------------------+
我在 Array[Map[String,String]]
类型的 spark 中连接两列,生成一个 Array[Array[Map[String,String]]]
类型的新列。但是,我想将该列展平,最终得到一个 Array[Map[String,String]]
类型的列,其中两个原始列的值都为
我从 Spark 2.4 了解到,可以直接在列的串联上应用 flatten
。像这样:
df.withColumn("concatenation", flatten(array($"colArrayMap1", $"colArrayMap2")))
但是我仍然使用 Spark 2.2,所以我需要为此使用 udf。这是我写的:
def flatten_collection(arr: Array[Array[Map[String,String]]]) = {
if(arr == null)
null
else
arr.flatten
}
val flatten_collection_udf = udf(flatten_collection _)
df.withColumn("concatenation", array($"colArrayMap1", $"colArrayMap2")).withColumn("concatenation", flatten_collection_udf($"concatenation")).show(false)
但我收到以下错误:
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun: (array<array<map<string,string>>>) => array<map<string,string>>)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$$anon.hasNext(WholeStageCodegenExec.scala:395)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:234)
at org.apache.spark.sql.execution.SparkPlan$$anonfun.apply(SparkPlan.scala:228)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:835)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$$anonfun$apply.apply(RDD.scala:835)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:49)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:380)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassCastException: scala.collection.mutable.WrappedArray$ofRef cannot be cast to [[Lscala.collection.immutable.Map;
我假设转换错误发生在 udf 中,但为什么以及如何避免它?
此外,如果有人知道不需要使用 UDF 的 Spark 2.2 解决方案,那就更好了
改编自答案 Seq
而不是 Array
。
def concat_arr(
arr1: Seq[Map[String,String]],
arr2: Seq[Map[String,String]]
) : Seq[Map[String,String]] =
{
(arr1 ++ arr2)
}
val concatUDF = udf(concat_arr _)
val df2 = df.withColumn("concatenation", concatUDF($"colArrayMap1", $"colArrayMap2"))
df2.show(false)
+--------------------+--------------------+----------------------------------------+
|colArrayMap1 |colArrayMap2 |concatenation |
+--------------------+--------------------+----------------------------------------+
|[[a -> b], [c -> d]]|[[a -> b], [c -> d]]|[[a -> b], [c -> d], [a -> b], [c -> d]]|
+--------------------+--------------------+----------------------------------------+