任务不可序列化:java.io.NotSerializableException - JsonSchema

Task not serializable: java.io.NotSerializableException - JsonSchema

我正在尝试使用 JsonSchema 来验证 RDD 中的行,以过滤掉无效的行。

这是我的代码:

import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.spark.sql.types.StructType

def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault

def stringToJsonSchema(str: String): Try[JsonSchema] = {
  stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}

def stringToJson(str: String): Try[JsonNode] = {
  val mapper = new ObjectMapper
  Try({
    val json = mapper.readTree(str)
    json
  })
}

def validateJson(data: JsonNode, jsonSchema: JsonSchema): Boolean = {
  val report = jsonSchema.validateUnchecked(data, true)
  report.isSuccess
}

val schemaSource: String = ...
val jsonSchema: JsonSchema = stringToJsonSchema(schemaSource).get
val df = spark.read
  .textFile("path/to/data.json")
  .filter(str => {
    stringToJson(str)
      .map(validateJson(_, jsonSchema))
      .getOrElse(false)
  })

但是,我收到一个错误,因为 JsonSchema 不可序列化:

Cause: org.apache.spark.SparkException: Task not serializable
[info]   Cause: java.io.NotSerializableException: com.github.fge.jsonschema.main.JsonSchema
[info] Serialization stack:
[info]  - object not serializable (class: com.github.fge.jsonschema.main.JsonSchema, value: com.github.fge.jsonschema.main.JsonSchema@33d22225)

我已阅读这些线程以寻找解决方案:

还有其他一些。

据我了解 - spark 需要序列化我想在 RDD 上执行的所有操作,以便将它们发送到工作节点。但是 JsonSchema 不可序列化,所以它失败了。

我试过这个解决方案:

  def genMapper[A, B](f: A => B): A => B = {
    val locker = com.twitter.chill.MeatLocker(f)
    x => locker.get.apply(x)
  }

  private class JsonSchemaValidator(jsonSchema: JsonSchema) extends (String => Boolean) {
    def apply(str: String): Boolean =
      stringToJson(str)
        .map(validateJson(_, jsonSchema))
        .getOrElse(false)
  }

val validator: String => Boolean = genMapper(new JsonSchemaValidator(jsonSchema))

df.filter(validator)

因为有人说它应该能够序列化任何东西。但是我得到了这个错误:

java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.misc.InnocuousThread / executeTests 11s
...
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile boolean jdk.internal.misc.InnocuousThread.hasRun accessible: module java.base does not "opens jdk.internal.misc" to unnamed module @7876b3b3

所以我尝试通过添加一些 java 选项来解决它:

ThisBuild / javaOptions ++= Seq(
    "--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED",
    "--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED",
    "--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED",
)

但这仍然会引发错误:

Caused by: java.lang.NoSuchMethodException: jdk.xml.internal.SecuritySupport$$Lambda62/0x0000000800c16840.writeReplace()
    at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)

所以我退后一步,尝试 broadcast:

val brdJsonSchema: Broadcast[JsonSchema] = spark.sparkContext.broadcast(jsonSchema)
df.filter(str => {
    stringToJson(str)
      .map(validateJson(_, brdJsonSchema.value))
      .getOrElse(false)
  })

但这也会导致序列化错误。

也尝试过 MeatLocker:

val brdSerJsonSchema: Broadcast[MeatLocker[JsonSchema]] = spark.sparkContext.broadcast(MeatLocker(jsonSchema))
df.filter(str => {
    stringToJson(str)
      .map(validateJson(_, brdSerJsonSchema.value.get))
      .getOrElse(false)
  })

但是我遇到了与上述 genMapper 解决方案相同的错误,添加 javaOptions 也无济于事。

我确实找到了解决方法,但我不喜欢它,因为它感觉很老套: 我可以在 filter 函数中解析 json 模式的字符串源:

val schemaSource: String = ...
df.filter(str => {
  val jsonSchema = stringToJsonSchema(schemaSource).get
  stringToJson(str)
    .map(validateJson(_, jsonSchema))
    .getOrElse(false)
})

但这意味着我要为 DF 的每一行解析 JsonSchema,这是对 CPU 和内存的浪费。

所以我可以改用缓存包装器,这也感觉很老套:

  class CacheWrapper[T] {
    private lazy val cache = mutable.Map.empty[String, T]

    def getCacheOrElse(cacheKey: String, getter: () => T): T = {
      cache.getOrElse(cacheKey, {
        cache.put(cacheKey, getter())
        cache(cacheKey)
      })
    }
  }

val schemaSource: String = ...
@transient lazy val jsonSchemaCache: CacheWrapper[JsonSchema] = new CacheWrapper[JsonSchema]
df.filter(str => {
  val jsonSchema = jsonSchemaCache.getOrElse(schemaSource.hashCode.toString, () => stringToJsonSchema(schemaSource).get)
  stringToJson(str)
    .map(validateJson(_, jsonSchema))
    .getOrElse(false)
})

这将具有仅解析 JsonSchema 一次的效果(当我们到达第一行时)。所有其他行将从缓存中获取它。

但又一次 - 这感觉很糟糕。

还有其他方法吗?

我想要的是一种将 schemaSource 字符串传递给所有 spark worker 节点的方法,让每个 worker 仅将其解析为 JsonSchema 一次,然后使用该 JsonSchema 对象来过滤 DF。这对我来说听起来应该很容易,但我找不到方法。

好的,同事帮我找到了解决方案。

来源:

代码:

private class JsonSchemaValidator(schemaSource: String) extends (String => Boolean) with Serializable {
    @transient lazy val jsonSchema: JsonSchema = JsonSchemaDFParser.stringToJsonSchema(schemaSource).get

    def apply(str: String): Boolean = 
      stringToJson(str)
        .map(validateJson(_, jsonSchema))
        .getOrElse(false)
  }

val validator: String => Boolean = new JsonSchemaValidator(schemaSource)

df.filter(validator)

@transient 具有在序列化时从对象中排除项目的效果。 lazy 表示在每个执行器上首次访问时将再次构造该字段。

包装 class 必须扩展 Serializable 才能工作。

注意:此解决方案有效。它做我想让它做的事。但是 - 对我来说仍然很奇怪,它必须如此难以做到这一点。我可能遗漏了 Spark 的某些功能,但我需要找到这种非常特殊的语法来完成如此简单的事情,这对我来说就像是一个设计缺陷。