在 EMR 作业中使用 AWS Java SDK
Use AWS Java SDK inside EMR Job
我有一个打包在 fatjar 中的 Scalding 作业,运行 在 EMR Hadoop 集群上。最近我在地图中添加了需要 DynamoDB 连接的新功能。但是一旦映射器到达 DynamoDB 初始化,它就会抛出以下异常:
Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:166)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
... 9 more
Caused by: cascading.flow.FlowException: internal error during mapper configuration
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:102)
... 14 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.amazonaws.http.conn.$Proxy7
Serialization trace:
connManager (com.amazonaws.http.impl.client.SdkHttpClient)
httpClient (com.amazonaws.http.AmazonHttpClient)
client (awscala.dynamodbv2.DynamoDBClient)
client (me.chuwy.enrich.hadoop.DuplicateStorage$DynamoDbStorage)
duplicateStorage (me.chuwy.enrich.hadoop.ShredJob)
$outer (me.chuwy.enrich.hadoop.ShredJob$$anonfun)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
at com.twitter.chill.Externalizer.fromBytes(Externalizer.scala:145)
at com.twitter.chill.Externalizer.maybeReadJavaKryo(Externalizer.scala:158)
at com.twitter.chill.Externalizer.readExternal(Externalizer.scala:148)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1839)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:101)
at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:312)
at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:293)
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:81)
... 14 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.http.conn.$Proxy7
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass
我 认为 问题可能在于 EMR AMI 在 /usr/share/aws/aws-java-sdk/
中带有自己的 jar,它们与我包含在 fat jar 中的库冲突,我试图删除它们,但其他 EMR 步骤都失败了。
问题出在 Scalding 序列化中,而不是我最初怀疑的二进制不兼容问题。 DynamodDB 客户端不是可序列化的实体,因为它包含处理程序、线程池等东西。因此它应该直接在作业 and 中初始化,声明为 lazy val.
我有一个打包在 fatjar 中的 Scalding 作业,运行 在 EMR Hadoop 集群上。最近我在地图中添加了需要 DynamoDB 连接的新功能。但是一旦映射器到达 DynamoDB 初始化,它就会抛出以下异常:
Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:172)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:166)
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
... 9 more
Caused by: cascading.flow.FlowException: internal error during mapper configuration
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:102)
... 14 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.amazonaws.http.conn.$Proxy7
Serialization trace:
connManager (com.amazonaws.http.impl.client.SdkHttpClient)
httpClient (com.amazonaws.http.AmazonHttpClient)
client (awscala.dynamodbv2.DynamoDBClient)
client (me.chuwy.enrich.hadoop.DuplicateStorage$DynamoDbStorage)
duplicateStorage (me.chuwy.enrich.hadoop.ShredJob)
$outer (me.chuwy.enrich.hadoop.ShredJob$$anonfun)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
at com.twitter.chill.Externalizer.fromBytes(Externalizer.scala:145)
at com.twitter.chill.Externalizer.maybeReadJavaKryo(Externalizer.scala:158)
at com.twitter.chill.Externalizer.readExternal(Externalizer.scala:148)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1839)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at java.util.HashMap.readObject(HashMap.java:1180)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:101)
at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:312)
at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:293)
at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:81)
... 14 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.http.conn.$Proxy7
at java.net.URLClassLoader.run(URLClassLoader.java:366)
at java.net.URLClassLoader.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass
我 认为 问题可能在于 EMR AMI 在 /usr/share/aws/aws-java-sdk/
中带有自己的 jar,它们与我包含在 fat jar 中的库冲突,我试图删除它们,但其他 EMR 步骤都失败了。
问题出在 Scalding 序列化中,而不是我最初怀疑的二进制不兼容问题。 DynamodDB 客户端不是可序列化的实体,因为它包含处理程序、线程池等东西。因此它应该直接在作业 and 中初始化,声明为 lazy val.