Spark 任务无法使用延迟 Window 函数序列化
Spark Task not serializable with lag Window function
我注意到,在我对 DataFrame 使用 Window 函数后,如果我使用函数调用 map(),Spark returns 出现“任务不可序列化”异常
这是我的代码:
val hc:org.apache.spark.sql.hive.HiveContext =
new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f() : String = "test"
case class P(name: String, surname: String)
val lag_result: org.apache.spark.sql.Column =
lag($"name",1).over(Window.partitionBy($"surname"))
val lista: List[P] = List(P("N1","S1"), P("N2","S2"), P("N2","S2"))
val data_frame: org.apache.spark.sql.DataFrame =
hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
// This works
// df.withColumn("lag_result", lag_result).map{ case x =>
// def f():String = "test";f}.collect
这是堆栈跟踪:
org.apache.spark.SparkException: Task not serializable at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
at
org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294)
at
org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at
org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:324) at
org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:323) at ...
and more Caused by: java.io.NotSerializableException:
org.apache.spark.sql.Column Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: 'lag(name,1,null) windowspecdefinition(surname,UnspecifiedFrame))
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC,
name: lag_result, type: class org.apache.spark.sql.Column) ... and
more
lag
returns o.a.s.sql.Column
不可序列化。同样的事情适用于 WindowSpec
。在交互模式下,这些对象可能作为 map
:
闭包的一部分包含在内
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]
scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097
scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> def f(x: Any) = x.toString
f: (x: Any)String
scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
- object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)
一个简单的解决方案是将两者都标记为瞬态:
scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470
scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> df.select(lag_y).map(f _).first
res1: String = [null]
我注意到,在我对 DataFrame 使用 Window 函数后,如果我使用函数调用 map(),Spark returns 出现“任务不可序列化”异常 这是我的代码:
val hc:org.apache.spark.sql.hive.HiveContext =
new org.apache.spark.sql.hive.HiveContext(sc)
import hc.implicits._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
def f() : String = "test"
case class P(name: String, surname: String)
val lag_result: org.apache.spark.sql.Column =
lag($"name",1).over(Window.partitionBy($"surname"))
val lista: List[P] = List(P("N1","S1"), P("N2","S2"), P("N2","S2"))
val data_frame: org.apache.spark.sql.DataFrame =
hc.createDataFrame(sc.parallelize(lista))
df.withColumn("lag_result", lag_result).map(x => f)
// This works
// df.withColumn("lag_result", lag_result).map{ case x =>
// def f():String = "test";f}.collect
这是堆栈跟踪:
org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) at org.apache.spark.SparkContext.clean(SparkContext.scala:2055) at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:324) at org.apache.spark.rdd.RDD$$anonfun$map.apply(RDD.scala:323) at ... and more Caused by: java.io.NotSerializableException: org.apache.spark.sql.Column Serialization stack:
- object not serializable (class: org.apache.spark.sql.Column, value: 'lag(name,1,null) windowspecdefinition(surname,UnspecifiedFrame))
- field (class: $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC, name: lag_result, type: class org.apache.spark.sql.Column) ... and more
lag
returns o.a.s.sql.Column
不可序列化。同样的事情适用于 WindowSpec
。在交互模式下,这些对象可能作为 map
:
scala> import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.expressions.Window
scala> val df = Seq(("foo", 1), ("bar", 2)).toDF("x", "y")
df: org.apache.spark.sql.DataFrame = [x: string, y: int]
scala> val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@307a0097
scala> val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> def f(x: Any) = x.toString
f: (x: Any)String
scala> df.select(lag_y).map(f _).first
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304)
...
Caused by: java.io.NotSerializableException: org.apache.spark.sql.expressions.WindowSpec
Serialization stack:
- object not serializable (class: org.apache.spark.sql.expressions.WindowSpec, value: org.apache.spark.sql.expressions.WindowSpec@307a0097)
一个简单的解决方案是将两者都标记为瞬态:
scala> @transient val w = Window.partitionBy("x").orderBy("y")
w: org.apache.spark.sql.expressions.WindowSpec = org.apache.spark.sql.expressions.WindowSpec@7dda1470
scala> @transient val lag_y = lag(col("y"), 1).over(w)
lag_y: org.apache.spark.sql.Column = 'lag(y,1,null) windowspecdefinition(x,y ASC,UnspecifiedFrame)
scala> df.select(lag_y).map(f _).first
res1: String = [null]