在 EMR 作业中使用 AWS Java SDK

Use AWS Java SDK inside EMR Job

我有一个打包在 fatjar 中的 Scalding 作业,运行 在 EMR Hadoop 集群上。最近我在地图中添加了需要 DynamoDB 连接的新功能。但是一旦映射器到达 DynamoDB 初始化,它就会抛出以下异常:

Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:112)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:78)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:136)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:450)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:344)
    at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:172)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:166)
Caused by: java.lang.reflect.InvocationTargetException
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:109)
    ... 9 more
Caused by: cascading.flow.FlowException: internal error during mapper configuration
    at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:102)
    ... 14 more
Caused by: com.esotericsoftware.kryo.KryoException: Unable to find class: com.amazonaws.http.conn.$Proxy7
Serialization trace:
connManager (com.amazonaws.http.impl.client.SdkHttpClient)
httpClient (com.amazonaws.http.AmazonHttpClient)
client (awscala.dynamodbv2.DynamoDBClient)
client (me.chuwy.enrich.hadoop.DuplicateStorage$DynamoDbStorage)
duplicateStorage (me.chuwy.enrich.hadoop.ShredJob)
$outer (me.chuwy.enrich.hadoop.ShredJob$$anonfun)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readName(DefaultClassResolver.java:138)
    at com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:115)
    at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:610)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:599)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:648)
    at com.esotericsoftware.kryo.serializers.FieldSerializer$ObjectField.read(FieldSerializer.java:605)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:221)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:25)
    at com.twitter.chill.SomeSerializer.read(SomeSerializer.scala:19)
    at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:729)
    at com.twitter.chill.SerDeState.readClassAndObject(SerDeState.java:61)
    at com.twitter.chill.KryoPool.fromBytes(KryoPool.java:94)
    at com.twitter.chill.Externalizer.fromBytes(Externalizer.scala:145)
    at com.twitter.chill.Externalizer.maybeReadJavaKryo(Externalizer.scala:158)
    at com.twitter.chill.Externalizer.readExternal(Externalizer.scala:148)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1839)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at java.util.HashMap.readObject(HashMap.java:1180)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1897)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1997)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1921)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
    at cascading.flow.hadoop.util.JavaObjectSerializer.deserialize(JavaObjectSerializer.java:101)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:312)
    at cascading.flow.hadoop.util.HadoopUtil.deserializeBase64(HadoopUtil.java:293)
    at cascading.flow.hadoop.FlowMapper.configure(FlowMapper.java:81)
    ... 14 more
Caused by: java.lang.ClassNotFoundException: com.amazonaws.http.conn.$Proxy7
    at java.net.URLClassLoader.run(URLClassLoader.java:366)
    at java.net.URLClassLoader.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass

认为 问题可能在于 EMR AMI 在 /usr/share/aws/aws-java-sdk/ 中带有自己的 jar,它们与我包含在 fat jar 中的库冲突,我试图删除它们,但其他 EMR 步骤都失败了。

问题出在 Scalding 序列化中,而不是我最初怀疑的二进制不兼容问题。 DynamodDB 客户端不是可序列化的实体,因为它包含处理程序、线程池等东西。因此它应该直接在作业 and 中初始化,声明为 lazy val.