任务不可序列化:java.io.NotSerializableException - JsonSchema
Task not serializable: java.io.NotSerializableException - JsonSchema
我正在尝试使用 JsonSchema
来验证 RDD 中的行,以过滤掉无效的行。
这是我的代码:
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.spark.sql.types.StructType
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode, jsonSchema: JsonSchema): Boolean = {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
}
val schemaSource: String = ...
val jsonSchema: JsonSchema = stringToJsonSchema(schemaSource).get
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
但是,我收到一个错误,因为 JsonSchema
不可序列化:
Cause: org.apache.spark.SparkException: Task not serializable
[info] Cause: java.io.NotSerializableException: com.github.fge.jsonschema.main.JsonSchema
[info] Serialization stack:
[info] - object not serializable (class: com.github.fge.jsonschema.main.JsonSchema, value: com.github.fge.jsonschema.main.JsonSchema@33d22225)
我已阅读这些线程以寻找解决方案:
- Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
- Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?
- https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
- https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/
还有其他一些。
据我了解 - spark 需要序列化我想在 RDD 上执行的所有操作,以便将它们发送到工作节点。但是 JsonSchema
不可序列化,所以它失败了。
我试过这个解决方案:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
private class JsonSchemaValidator(jsonSchema: JsonSchema) extends (String => Boolean) {
def apply(str: String): Boolean =
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
}
val validator: String => Boolean = genMapper(new JsonSchemaValidator(jsonSchema))
df.filter(validator)
因为有人说它应该能够序列化任何东西。但是我得到了这个错误:
java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.misc.InnocuousThread / executeTests 11s
...
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile boolean jdk.internal.misc.InnocuousThread.hasRun accessible: module java.base does not "opens jdk.internal.misc" to unnamed module @7876b3b3
所以我尝试通过添加一些 java 选项来解决它:
ThisBuild / javaOptions ++= Seq(
"--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED",
)
但这仍然会引发错误:
Caused by: java.lang.NoSuchMethodException: jdk.xml.internal.SecuritySupport$$Lambda62/0x0000000800c16840.writeReplace()
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
所以我退后一步,尝试 broadcast
:
val brdJsonSchema: Broadcast[JsonSchema] = spark.sparkContext.broadcast(jsonSchema)
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdJsonSchema.value))
.getOrElse(false)
})
但这也会导致序列化错误。
也尝试过 MeatLocker
:
val brdSerJsonSchema: Broadcast[MeatLocker[JsonSchema]] = spark.sparkContext.broadcast(MeatLocker(jsonSchema))
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdSerJsonSchema.value.get))
.getOrElse(false)
})
但是我遇到了与上述 genMapper
解决方案相同的错误,添加 javaOptions
也无济于事。
我确实找到了解决方法,但我不喜欢它,因为它感觉很老套:
我可以在 filter
函数中解析 json 模式的字符串源:
val schemaSource: String = ...
df.filter(str => {
val jsonSchema = stringToJsonSchema(schemaSource).get
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
但这意味着我要为 DF 的每一行解析 JsonSchema
,这是对 CPU 和内存的浪费。
所以我可以改用缓存包装器,这也感觉很老套:
class CacheWrapper[T] {
private lazy val cache = mutable.Map.empty[String, T]
def getCacheOrElse(cacheKey: String, getter: () => T): T = {
cache.getOrElse(cacheKey, {
cache.put(cacheKey, getter())
cache(cacheKey)
})
}
}
val schemaSource: String = ...
@transient lazy val jsonSchemaCache: CacheWrapper[JsonSchema] = new CacheWrapper[JsonSchema]
df.filter(str => {
val jsonSchema = jsonSchemaCache.getOrElse(schemaSource.hashCode.toString, () => stringToJsonSchema(schemaSource).get)
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
这将具有仅解析 JsonSchema
一次的效果(当我们到达第一行时)。所有其他行将从缓存中获取它。
但又一次 - 这感觉很糟糕。
还有其他方法吗?
我想要的是一种将 schemaSource
字符串传递给所有 spark worker 节点的方法,让每个 worker 仅将其解析为 JsonSchema
一次,然后使用该 JsonSchema
对象来过滤 DF。这对我来说听起来应该很容易,但我找不到方法。
好的,同事帮我找到了解决方案。
来源:
- https://nathankleyn.com/2017/12/29/using-transient-and-lazy-vals-to-avoid-spark-serialisation-issues/
- https://www.waitingforcode.com/apache-spark/serialization-issues-part-2/read#serializable_factory_wrapper
代码:
private class JsonSchemaValidator(schemaSource: String) extends (String => Boolean) with Serializable {
@transient lazy val jsonSchema: JsonSchema = JsonSchemaDFParser.stringToJsonSchema(schemaSource).get
def apply(str: String): Boolean =
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
}
val validator: String => Boolean = new JsonSchemaValidator(schemaSource)
df.filter(validator)
@transient
具有在序列化时从对象中排除项目的效果。
lazy
表示在每个执行器上首次访问时将再次构造该字段。
包装 class 必须扩展 Serializable 才能工作。
注意:此解决方案有效。它做我想让它做的事。但是 - 对我来说仍然很奇怪,它必须如此难以做到这一点。我可能遗漏了 Spark 的某些功能,但我需要找到这种非常特殊的语法来完成如此简单的事情,这对我来说就像是一个设计缺陷。
我正在尝试使用 JsonSchema
来验证 RDD 中的行,以过滤掉无效的行。
这是我的代码:
import com.github.fge.jsonschema.main.{JsonSchema, JsonSchemaFactory}
import com.fasterxml.jackson.databind.{JsonNode, ObjectMapper}
import org.apache.spark.sql.types.StructType
def getJsonSchemaFactory: JsonSchemaFactory = JsonSchemaFactory.byDefault
def stringToJsonSchema(str: String): Try[JsonSchema] = {
stringToJson(str).map(getJsonSchemaFactory.getJsonSchema(_))
}
def stringToJson(str: String): Try[JsonNode] = {
val mapper = new ObjectMapper
Try({
val json = mapper.readTree(str)
json
})
}
def validateJson(data: JsonNode, jsonSchema: JsonSchema): Boolean = {
val report = jsonSchema.validateUnchecked(data, true)
report.isSuccess
}
val schemaSource: String = ...
val jsonSchema: JsonSchema = stringToJsonSchema(schemaSource).get
val df = spark.read
.textFile("path/to/data.json")
.filter(str => {
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
但是,我收到一个错误,因为 JsonSchema
不可序列化:
Cause: org.apache.spark.SparkException: Task not serializable
[info] Cause: java.io.NotSerializableException: com.github.fge.jsonschema.main.JsonSchema
[info] Serialization stack:
[info] - object not serializable (class: com.github.fge.jsonschema.main.JsonSchema, value: com.github.fge.jsonschema.main.JsonSchema@33d22225)
我已阅读这些线程以寻找解决方案:
- Task not serializable: java.io.NotSerializableException when calling function outside closure only on classes not objects
- Spark - Task not serializable: How to work with complex map closures that call outside classes/objects?
- https://github.com/samthebest/dump/blob/master/sams-scala-tutorial/serialization-exceptions-and-memory-leaks-no-ws.md
- https://www.placeiq.com/2017/11/how-to-solve-non-serializable-errors-when-instantiating-objects-in-spark-udfs/
还有其他一些。
据我了解 - spark 需要序列化我想在 RDD 上执行的所有操作,以便将它们发送到工作节点。但是 JsonSchema
不可序列化,所以它失败了。
我试过这个解决方案:
def genMapper[A, B](f: A => B): A => B = {
val locker = com.twitter.chill.MeatLocker(f)
x => locker.get.apply(x)
}
private class JsonSchemaValidator(jsonSchema: JsonSchema) extends (String => Boolean) {
def apply(str: String): Boolean =
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
}
val validator: String => Boolean = genMapper(new JsonSchemaValidator(jsonSchema))
df.filter(validator)
因为有人说它应该能够序列化任何东西。但是我得到了这个错误:
java.lang.IllegalArgumentException: Unable to create serializer "com.esotericsoftware.kryo.serializers.FieldSerializer" for class: jdk.internal.misc.InnocuousThread / executeTests 11s
...
Caused by: java.lang.reflect.InaccessibleObjectException: Unable to make field private volatile boolean jdk.internal.misc.InnocuousThread.hasRun accessible: module java.base does not "opens jdk.internal.misc" to unnamed module @7876b3b3
所以我尝试通过添加一些 java 选项来解决它:
ThisBuild / javaOptions ++= Seq(
"--add-opens", "java.base/jdk.internal.misc=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.ref=ALL-UNNAMED",
"--add-opens", "java.base/jdk.internal.loader=ALL-UNNAMED",
)
但这仍然会引发错误:
Caused by: java.lang.NoSuchMethodException: jdk.xml.internal.SecuritySupport$$Lambda62/0x0000000800c16840.writeReplace()
at java.base/java.lang.Class.getDeclaredMethod(Class.java:2475)
所以我退后一步,尝试 broadcast
:
val brdJsonSchema: Broadcast[JsonSchema] = spark.sparkContext.broadcast(jsonSchema)
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdJsonSchema.value))
.getOrElse(false)
})
但这也会导致序列化错误。
也尝试过 MeatLocker
:
val brdSerJsonSchema: Broadcast[MeatLocker[JsonSchema]] = spark.sparkContext.broadcast(MeatLocker(jsonSchema))
df.filter(str => {
stringToJson(str)
.map(validateJson(_, brdSerJsonSchema.value.get))
.getOrElse(false)
})
但是我遇到了与上述 genMapper
解决方案相同的错误,添加 javaOptions
也无济于事。
我确实找到了解决方法,但我不喜欢它,因为它感觉很老套:
我可以在 filter
函数中解析 json 模式的字符串源:
val schemaSource: String = ...
df.filter(str => {
val jsonSchema = stringToJsonSchema(schemaSource).get
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
但这意味着我要为 DF 的每一行解析 JsonSchema
,这是对 CPU 和内存的浪费。
所以我可以改用缓存包装器,这也感觉很老套:
class CacheWrapper[T] {
private lazy val cache = mutable.Map.empty[String, T]
def getCacheOrElse(cacheKey: String, getter: () => T): T = {
cache.getOrElse(cacheKey, {
cache.put(cacheKey, getter())
cache(cacheKey)
})
}
}
val schemaSource: String = ...
@transient lazy val jsonSchemaCache: CacheWrapper[JsonSchema] = new CacheWrapper[JsonSchema]
df.filter(str => {
val jsonSchema = jsonSchemaCache.getOrElse(schemaSource.hashCode.toString, () => stringToJsonSchema(schemaSource).get)
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
})
这将具有仅解析 JsonSchema
一次的效果(当我们到达第一行时)。所有其他行将从缓存中获取它。
但又一次 - 这感觉很糟糕。
还有其他方法吗?
我想要的是一种将 schemaSource
字符串传递给所有 spark worker 节点的方法,让每个 worker 仅将其解析为 JsonSchema
一次,然后使用该 JsonSchema
对象来过滤 DF。这对我来说听起来应该很容易,但我找不到方法。
好的,同事帮我找到了解决方案。
来源:
- https://nathankleyn.com/2017/12/29/using-transient-and-lazy-vals-to-avoid-spark-serialisation-issues/
- https://www.waitingforcode.com/apache-spark/serialization-issues-part-2/read#serializable_factory_wrapper
代码:
private class JsonSchemaValidator(schemaSource: String) extends (String => Boolean) with Serializable {
@transient lazy val jsonSchema: JsonSchema = JsonSchemaDFParser.stringToJsonSchema(schemaSource).get
def apply(str: String): Boolean =
stringToJson(str)
.map(validateJson(_, jsonSchema))
.getOrElse(false)
}
val validator: String => Boolean = new JsonSchemaValidator(schemaSource)
df.filter(validator)
@transient
具有在序列化时从对象中排除项目的效果。
lazy
表示在每个执行器上首次访问时将再次构造该字段。
包装 class 必须扩展 Serializable 才能工作。
注意:此解决方案有效。它做我想让它做的事。但是 - 对我来说仍然很奇怪,它必须如此难以做到这一点。我可能遗漏了 Spark 的某些功能,但我需要找到这种非常特殊的语法来完成如此简单的事情,这对我来说就像是一个设计缺陷。