CheckPointing 时在 foreachRDD() 中使用的对象的序列化

Serialization of an object used in foreachRDD() when CheckPointing

根据 this question 和我读过的文档,Spark Streaming 的 foreachRDD(someFunction) 将执行 someFunction 本身仅在驱动程序进程中,尽管如果在 RDD 上完成了操作,那么它们将在执行程序上完成——RDD 所在的位置。

以上所有内容也适用于我,尽管我注意到如果我打开检查点,那么似乎 spark 正在尝试序列化 foreachRDD(someFunction) 中的所有内容并发送到某个地方——这对我来说是个问题,因为使用的对象之一是不可序列化的(即 schemaRegistryClient)。我尝试了 Kryo 序列化器,但也没有成功。

如果我关闭检查点,序列化问题就会消失。

有没有办法让 Spark 不序列化 foreachRDD(someFunc) 中使用的内容,同时继续使用检查点?

非常感谢。

这里有几件事很重要:

  1. 您不能在工作程序(在 RDD 内)上执行的代码中使用此客户端。
  2. 您可以使用临时客户端字段创建对象,并在作业重新启动后重新创建它。可以找到如何完成此操作的示例 here.
  3. 同样的原则适用于广播和累加器变量。
  4. 检查点持久化数据、作业元数据和代码逻辑。更改代码后,您的检查点将失效。

Is there a way to let Spark not to serialize what's used in foreachRDD(someFunc) while also keep using checkpointing?

检查点应该与您的问题无关。根本问题是您有一个不可序列化的对象实例需要发送给您的工作人员。

当您有这样的依赖关系时,可以在 Spark 中使用一个通用模式。您创建了一个带有延迟瞬态 属性 的 object,它将在需要时加载到 工作节点 中:

object RegisteryWrapper {
  @transient lazy val schemaClient: SchemaRegisteryClient = new SchemaRegisteryClient()
}

以及需要在里面使用的时候foreachRDD:

someStream.foreachRDD { 
   rdd => rdd.foreachPartition { iterator => 
       val schemaClient = RegisteryWrapper.schemaClient
       iterator.foreach(schemaClient.send(_))
  }
}

问题可能与检查点数据有关,如果您更改了代码中的任何内容,则需要删除旧的检查点元数据。