与 aws-java-sdk 链接时读取 json 文件时 Spark 崩溃
Spark crash while reading json file when linked with aws-java-sdk
让config.json
成为一个小的json文件:
{
"toto": 1
}
我做了一个简单的代码,用sc.textFile
读取json文件(因为文件可以在S3,本地或HDFS上,所以textFile很方便)
import org.apache.spark.{SparkContext, SparkConf}
object testAwsSdk {
def main( args:Array[String] ):Unit = {
val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val json = sc.textFile("config.json")
println(json.collect().mkString("\n"))
}
}
SBT 文件仅拉取 spark-core
库
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
程序按预期运行,将 config.json 的内容写入标准输出。
现在我想link也用aws-java-sdk,amazon的sdk访问S3。
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
执行同样的代码,spark抛出如下异常。
Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)
at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$$anonfun.apply(RDDOperationScope.scala:133)
at org.apache.spark.rdd.RDDOperationScope$$anonfun.apply(RDDOperationScope.scala:133)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:133)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1012)
at org.apache.spark.SparkContext$$anonfun$textFile.apply(SparkContext.scala:827)
at org.apache.spark.SparkContext$$anonfun$textFile.apply(SparkContext.scala:825)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:825)
at testAwsSdk$.main(testAwsSdk.scala:11)
at testAwsSdk.main(testAwsSdk.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
读取堆栈,似乎当 aws-java-sdk 被 linked 时,sc.textFile
检测到文件是 json 文件并尝试用杰克逊假设某种格式解析它,当然它找不到。我需要 link 与 aws-java-sdk,所以我的问题是:
1- 为什么添加 aws-java-sdk
会修改 spark-core
的行为?
2- 是否有解决方法(文件可以在 HDFS、S3 或本地)?
与亚马逊支持人员交谈。这是杰克逊图书馆的一个依赖问题。在 SBT 中,覆盖 jackson:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)
他们的回答:
我们已经在 Mac、Ec2 (redhat AMI) 实例和 EMR (Amazon Linux) 上完成了此操作。 3 不同的环境。问题的根本原因是 sbt 构建了一个依赖图,然后通过驱逐旧版本并选择最新版本的依赖库来处理版本冲突问题。在这种情况下,spark 依赖于 2.4 版本的 jackson 库,而 AWS SDK 需要 2.5。因此存在版本冲突,sbt 逐出 spark 的依赖版本(较旧)并选择 AWS SDK 版本(最新)。
添加到,如果您不想使用固定版本的 Jackson(也许将来您会升级 Spark)但仍想丢弃来自 AWS 的版本,您可以执行以下操作以下:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile" excludeAll (
ExclusionRule("com.fasterxml.jackson.core", "jackson-databind")
),
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
让config.json
成为一个小的json文件:
{
"toto": 1
}
我做了一个简单的代码,用sc.textFile
读取json文件(因为文件可以在S3,本地或HDFS上,所以textFile很方便)
import org.apache.spark.{SparkContext, SparkConf}
object testAwsSdk {
def main( args:Array[String] ):Unit = {
val sparkConf = new SparkConf().setAppName("test-aws-sdk").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val json = sc.textFile("config.json")
println(json.collect().mkString("\n"))
}
}
SBT 文件仅拉取 spark-core
库
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
程序按预期运行,将 config.json 的内容写入标准输出。
现在我想link也用aws-java-sdk,amazon的sdk访问S3。
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
执行同样的代码,spark抛出如下异常。
Exception in thread "main" com.fasterxml.jackson.databind.JsonMappingException: Could not find creator property with name 'id' (in class org.apache.spark.rdd.RDDOperationScope)
at [Source: {"id":"0","name":"textFile"}; line: 1, column: 1]
at com.fasterxml.jackson.databind.JsonMappingException.from(JsonMappingException.java:148)
at com.fasterxml.jackson.databind.DeserializationContext.mappingException(DeserializationContext.java:843)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.addBeanProps(BeanDeserializerFactory.java:533)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.buildBeanDeserializer(BeanDeserializerFactory.java:220)
at com.fasterxml.jackson.databind.deser.BeanDeserializerFactory.createBeanDeserializer(BeanDeserializerFactory.java:143)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer2(DeserializerCache.java:409)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createDeserializer(DeserializerCache.java:358)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCache2(DeserializerCache.java:265)
at com.fasterxml.jackson.databind.deser.DeserializerCache._createAndCacheValueDeserializer(DeserializerCache.java:245)
at com.fasterxml.jackson.databind.deser.DeserializerCache.findValueDeserializer(DeserializerCache.java:143)
at com.fasterxml.jackson.databind.DeserializationContext.findRootValueDeserializer(DeserializationContext.java:439)
at com.fasterxml.jackson.databind.ObjectMapper._findRootDeserializer(ObjectMapper.java:3666)
at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:3558)
at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:2578)
at org.apache.spark.rdd.RDDOperationScope$.fromJson(RDDOperationScope.scala:82)
at org.apache.spark.rdd.RDDOperationScope$$anonfun.apply(RDDOperationScope.scala:133)
at org.apache.spark.rdd.RDDOperationScope$$anonfun.apply(RDDOperationScope.scala:133)
at scala.Option.map(Option.scala:145)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:133)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at org.apache.spark.SparkContext.hadoopFile(SparkContext.scala:1012)
at org.apache.spark.SparkContext$$anonfun$textFile.apply(SparkContext.scala:827)
at org.apache.spark.SparkContext$$anonfun$textFile.apply(SparkContext.scala:825)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108)
at org.apache.spark.SparkContext.withScope(SparkContext.scala:709)
at org.apache.spark.SparkContext.textFile(SparkContext.scala:825)
at testAwsSdk$.main(testAwsSdk.scala:11)
at testAwsSdk.main(testAwsSdk.scala)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:497)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140)
读取堆栈,似乎当 aws-java-sdk 被 linked 时,sc.textFile
检测到文件是 json 文件并尝试用杰克逊假设某种格式解析它,当然它找不到。我需要 link 与 aws-java-sdk,所以我的问题是:
1- 为什么添加 aws-java-sdk
会修改 spark-core
的行为?
2- 是否有解决方法(文件可以在 HDFS、S3 或本地)?
与亚马逊支持人员交谈。这是杰克逊图书馆的一个依赖问题。在 SBT 中,覆盖 jackson:
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile",
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)
dependencyOverrides ++= Set(
"com.fasterxml.jackson.core" % "jackson-databind" % "2.4.4"
)
他们的回答: 我们已经在 Mac、Ec2 (redhat AMI) 实例和 EMR (Amazon Linux) 上完成了此操作。 3 不同的环境。问题的根本原因是 sbt 构建了一个依赖图,然后通过驱逐旧版本并选择最新版本的依赖库来处理版本冲突问题。在这种情况下,spark 依赖于 2.4 版本的 jackson 库,而 AWS SDK 需要 2.5。因此存在版本冲突,sbt 逐出 spark 的依赖版本(较旧)并选择 AWS SDK 版本(最新)。
添加到
libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk" % "1.10.30" % "compile" excludeAll (
ExclusionRule("com.fasterxml.jackson.core", "jackson-databind")
),
"org.apache.spark" %% "spark-core" % "1.5.1" % "compile"
)