如何将 ojai 配置从驱动程序传递给 spark 中的执行程序?
How to pass ojai configuration from driver to executors in spark?
我想知道如何将 OJAI 连接从 spark 驱动程序传递到它的执行程序。这是我的代码:
val connection = DriverManager.getConnection("ojai:mapr:")
val store = connection.getStore("/tables/table1")
val someStream = messagesDStream.mapPartitions {
iterator => {
val list = iterator
.map(record => record.value())
.toList
.asJava
//TODO serializacja, deserializacja, interface serializable w javie
val query = connection
.newQuery()
.where(connection.newCondition()
.in("_id", list)
.build())
.build()}
我得到的错误:
Caused by: java.io.NotSerializableException: com.mapr.ojai.store.impl.OjaiConnection
Serialization stack:
- object not serializable (class: com.mapr.ojai.store.impl.OjaiConnection, value: com.mapr.ojai.store.impl.OjaiConnection@2a367e93)
- field (class: com.example.App$$anonfun, name: connection, type: interface org.ojai.store.Connection)
- object (class com.example.App$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
...
只要与 OJAI 的连接在 mapPartitions 函数内部,一切都很好。我知道我需要将配置从驱动程序传递给执行程序才能使代码正常工作,但我不知道该怎么做。 Tschüs!
您 运行 陷入了 spark 最臭名昭著的错误 - 任务不可序列化。
从本质上讲,这意味着您尝试序列化的 类 或对象之一 - 通过网络从驱动程序发送到执行程序 - 无法以这种方式处理:这里是 ojai 连接器。
您不能将连接本身从驱动程序传递给执行程序 - 您可以做的是在伴随对象中声明连接,同时避免为来自流的每批 RDD 不断重新创建连接作为
@transient lazy val connection = ...
并参考 mapPartitions 中的内容。这将确保每个执行程序都有一个到数据库的连接,该连接将在多个批次中持续存在,因为以这种方式标记的字段不会在驱动程序上创建然后序列化,而是在每个执行程序上创建。
我想知道如何将 OJAI 连接从 spark 驱动程序传递到它的执行程序。这是我的代码:
val connection = DriverManager.getConnection("ojai:mapr:")
val store = connection.getStore("/tables/table1")
val someStream = messagesDStream.mapPartitions {
iterator => {
val list = iterator
.map(record => record.value())
.toList
.asJava
//TODO serializacja, deserializacja, interface serializable w javie
val query = connection
.newQuery()
.where(connection.newCondition()
.in("_id", list)
.build())
.build()}
我得到的错误:
Caused by: java.io.NotSerializableException: com.mapr.ojai.store.impl.OjaiConnection
Serialization stack:
- object not serializable (class: com.mapr.ojai.store.impl.OjaiConnection, value: com.mapr.ojai.store.impl.OjaiConnection@2a367e93)
- field (class: com.example.App$$anonfun, name: connection, type: interface org.ojai.store.Connection)
- object (class com.example.App$$anonfun, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:342)
...
只要与 OJAI 的连接在 mapPartitions 函数内部,一切都很好。我知道我需要将配置从驱动程序传递给执行程序才能使代码正常工作,但我不知道该怎么做。 Tschüs!
您 运行 陷入了 spark 最臭名昭著的错误 - 任务不可序列化。 从本质上讲,这意味着您尝试序列化的 类 或对象之一 - 通过网络从驱动程序发送到执行程序 - 无法以这种方式处理:这里是 ojai 连接器。
您不能将连接本身从驱动程序传递给执行程序 - 您可以做的是在伴随对象中声明连接,同时避免为来自流的每批 RDD 不断重新创建连接作为
@transient lazy val connection = ...
并参考 mapPartitions 中的内容。这将确保每个执行程序都有一个到数据库的连接,该连接将在多个批次中持续存在,因为以这种方式标记的字段不会在驱动程序上创建然后序列化,而是在每个执行程序上创建。