如何为 Spark Structured Streaming [SparkException: Task not serializable] 编写 JDBC Sink?
How to write JDBC Sink for Spark Structured Streaming [SparkException: Task not serializable]?
我的 spark 结构化流数据框架需要一个 JDBC 接收器。目前,据我所知,DataFrame 的 API 缺少 writeStream
到 JDBC 实现(无论是在 PySpark 还是在 Scala(当前 Spark 版本 2.2.0)中)。我发现的唯一建议是基于 this article.
编写自己的 ForeachWriter
Scala class
因此,我通过添加自定义 ForeachWriter
class 修改了 here 中的一个简单字数统计示例,并尝试 writeStream
到 PostgreSQL。词流是从控制台手动生成的(使用 NetCat:nc -lk -p 9999)并由 Spark 从套接字读取。
不幸的是,我收到 "Task not serializable" 错误。
APACHE_SPARK_VERSION=2.1.0
使用 Scala 版本 2.11.8(Java HotSpot(TM) 64 位服务器 VM,Java 1.8.0_112)
我的 Scala 代码:
//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.
import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCountToJDBC")
.config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
val driver = "org.postgresql.Driver"
var connection:java.sql.Connection = _
var statement:java.sql.Statement = _
def open(partitionId: Long, version: Long):Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.executeUpdate("INSERT INTO public.test(col1, col2) " +
"VALUES ('" + value(0) + "'," + value(1) + ");")
}
def close(errorOrNull:Throwable):Unit = {
connection.close
}
}
val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>"
val user="<user name>"
val pwd="<pass>"
val writer = new JDBCSink(url, user, pwd)
import org.apache.spark.sql.streaming.ProcessingTime
val query=wordCounts
.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(ProcessingTime("25 seconds"))
.start()
query.awaitTermination()
错误信息:
ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:177)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED])
- field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery)
- object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f)
- field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw)
- object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19)
- field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw)
- object (class $line21.$read, $line21.$read@13e62f5d)
- field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read)
- object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c)
- field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw)
- object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5)
- field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw)
- object (class $line25.$read$$iw$$iw$JDBCSink, $line25.$read$$iw$$iw$JDBCSink@6c096c84)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink, org.apache.spark.sql.execution.streaming.ForeachSink@6feccb75)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 25 more
如何让它发挥作用?
解决方案
(感谢所有人,特别感谢@zsxwing 的直接解决方案):
- 将 JDBC接收器 class 保存到文件。
- 在spark-shell中加载一个classf.eg。使用
scala> :load <path_to_a_JDBCSink.scala_file>
- 最终
scala> :paste
没有 JDBC 接收器 class 定义的代码。
看来这里的违规者是在JDBCSink
class中使用了import spark.implicits._
:
JDBCSink
必须是可序列化的
- 通过添加此导入,您可以让
JDBCSink
引用不可序列化的 SparkSession
,然后将其与它一起序列化(技术上,SparkSession extends Serializable
,但这并不意味着在工作节点上反序列化)
好消息:您没有使用这个导入,所以如果您只是删除它,它应该可以工作。
只需在单独的文件中定义 JDBCSink,而不是将其定义为可能捕获外部引用的内部 class。
如果有人在交互式工作簿中遇到此问题,此解决方案也适用:
除了将 JDBCSink
class 保存到单独的文件中,您还可以在同一工作簿中将其声明为单独的包 ("Packaged cell") 并将该包导入您正在使用它的单元格。这里描述得很好https://docs.databricks.com/user-guide/notebooks/package-cells.html
我的 spark 结构化流数据框架需要一个 JDBC 接收器。目前,据我所知,DataFrame 的 API 缺少 writeStream
到 JDBC 实现(无论是在 PySpark 还是在 Scala(当前 Spark 版本 2.2.0)中)。我发现的唯一建议是基于 this article.
ForeachWriter
Scala class
因此,我通过添加自定义 ForeachWriter
class 修改了 here 中的一个简单字数统计示例,并尝试 writeStream
到 PostgreSQL。词流是从控制台手动生成的(使用 NetCat:nc -lk -p 9999)并由 Spark 从套接字读取。
不幸的是,我收到 "Task not serializable" 错误。
APACHE_SPARK_VERSION=2.1.0 使用 Scala 版本 2.11.8(Java HotSpot(TM) 64 位服务器 VM,Java 1.8.0_112)
我的 Scala 代码:
//Spark context available as 'sc' (master = local[*], app id = local-1501242382770).
//Spark session available as 'spark'.
import java.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder
.master("local[*]")
.appName("StructuredNetworkWordCountToJDBC")
.config("spark.jars", "/tmp/data/postgresql-42.1.1.jar")
.getOrCreate()
import spark.implicits._
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
val words = lines.as[String].flatMap(_.split(" "))
val wordCounts = words.groupBy("value").count()
class JDBCSink(url: String, user:String, pwd:String) extends org.apache.spark.sql.ForeachWriter[org.apache.spark.sql.Row]{
val driver = "org.postgresql.Driver"
var connection:java.sql.Connection = _
var statement:java.sql.Statement = _
def open(partitionId: Long, version: Long):Boolean = {
Class.forName(driver)
connection = java.sql.DriverManager.getConnection(url, user, pwd)
statement = connection.createStatement
true
}
def process(value: org.apache.spark.sql.Row): Unit = {
statement.executeUpdate("INSERT INTO public.test(col1, col2) " +
"VALUES ('" + value(0) + "'," + value(1) + ");")
}
def close(errorOrNull:Throwable):Unit = {
connection.close
}
}
val url="jdbc:postgresql://<mypostgreserver>:<port>/<mydb>"
val user="<user name>"
val pwd="<pass>"
val writer = new JDBCSink(url, user, pwd)
import org.apache.spark.sql.streaming.ProcessingTime
val query=wordCounts
.writeStream
.foreach(writer)
.outputMode("complete")
.trigger(ProcessingTime("25 seconds"))
.start()
query.awaitTermination()
错误信息:
ERROR StreamExecution: Query [id = ef2e7a4c-0d64-4cad-ad4f-91d349f8575b, runId = a86902e6-d168-49d1-b7e7-084ce503ea68] terminated with error
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2094)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:924)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition.apply(RDD.scala:923)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:923)
at org.apache.spark.sql.execution.streaming.ForeachSink.addBatch(ForeachSink.scala:49)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply$mcV$sp(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch.apply(StreamExecution.scala:503)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:502)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply$mcV$sp(StreamExecution.scala:255)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$$anonfun.apply(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches.apply$mcZ$sp(StreamExecution.scala:244)
at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at org.apache.spark.sql.execution.streaming.StreamExecution$$anon.run(StreamExecution.scala:177)
Caused by: java.io.NotSerializableException: org.apache.spark.sql.execution.streaming.StreamExecution
Serialization stack:
- object not serializable (class: org.apache.spark.sql.execution.streaming.StreamExecution, value: Streaming Query [id = 9b01db99-9120-4047-b779-2e2e0b289f65, runId = e20beefa-146a-4139-96f9-de3d64ce048a] [state = TERMINATED])
- field (class: $line21.$read$$iw$$iw, name: query, type: interface org.apache.spark.sql.streaming.StreamingQuery)
- object (class $line21.$read$$iw$$iw, $line21.$read$$iw$$iw@24747e0f)
- field (class: $line21.$read$$iw, name: $iw, type: class $line21.$read$$iw$$iw)
- object (class $line21.$read$$iw, $line21.$read$$iw@1814ed19)
- field (class: $line21.$read, name: $iw, type: class $line21.$read$$iw)
- object (class $line21.$read, $line21.$read@13e62f5d)
- field (class: $line25.$read$$iw, name: $line21$read, type: class $line21.$read)
- object (class $line25.$read$$iw, $line25.$read$$iw@14240e5c)
- field (class: $line25.$read$$iw$$iw, name: $outer, type: class $line25.$read$$iw)
- object (class $line25.$read$$iw$$iw, $line25.$read$$iw$$iw@11e4c6f5)
- field (class: $line25.$read$$iw$$iw$JDBCSink, name: $outer, type: class $line25.$read$$iw$$iw)
- object (class $line25.$read$$iw$$iw$JDBCSink, $line25.$read$$iw$$iw$JDBCSink@6c096c84)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink, name: org$apache$spark$sql$execution$streaming$ForeachSink$$writer, type: class org.apache.spark.sql.ForeachWriter)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink, org.apache.spark.sql.execution.streaming.ForeachSink@6feccb75)
- field (class: org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch, name: $outer, type: class org.apache.spark.sql.execution.streaming.ForeachSink)
- object (class org.apache.spark.sql.execution.streaming.ForeachSink$$anonfun$addBatch, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
... 25 more
如何让它发挥作用?
解决方案
(感谢所有人,特别感谢@zsxwing 的直接解决方案):
- 将 JDBC接收器 class 保存到文件。
- 在spark-shell中加载一个classf.eg。使用
scala> :load <path_to_a_JDBCSink.scala_file>
- 最终
scala> :paste
没有 JDBC 接收器 class 定义的代码。
看来这里的违规者是在JDBCSink
class中使用了import spark.implicits._
:
JDBCSink
必须是可序列化的- 通过添加此导入,您可以让
JDBCSink
引用不可序列化的SparkSession
,然后将其与它一起序列化(技术上,SparkSession extends Serializable
,但这并不意味着在工作节点上反序列化)
好消息:您没有使用这个导入,所以如果您只是删除它,它应该可以工作。
只需在单独的文件中定义 JDBCSink,而不是将其定义为可能捕获外部引用的内部 class。
如果有人在交互式工作簿中遇到此问题,此解决方案也适用:
除了将 JDBCSink
class 保存到单独的文件中,您还可以在同一工作簿中将其声明为单独的包 ("Packaged cell") 并将该包导入您正在使用它的单元格。这里描述得很好https://docs.databricks.com/user-guide/notebooks/package-cells.html