Accumulo MapReduce 作业因 java.io.EOFException 而失败,使用 AccumuloRowInputFormat

Accumulo MapReduce job fails with java.io.EOFException, using AccumuloRowInputFormat

我所有的映射器都失败了,除了下面的例外。为了简洁起见,我只展示了最后一次失败。

为什么会发生这种情况,我该如何解决?

16/09/21 17:01:57 INFO mapred.JobClient: Task Id : attempt_201609151451_0044_m_000002_2, Status : FAILED
java.io.EOFException
    at java.io.DataInputStream.readFully(DataInputStream.java:197)
    at java.io.DataInputStream.readUTF(DataInputStream.java:609)
    at java.io.DataInputStream.readUTF(DataInputStream.java:564)
    at org.apache.accumulo.core.client.mapreduce.RangeInputSplit.readFields(RangeInputSplit.java:154)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:71)
    at org.apache.hadoop.io.serializer.WritableSerialization$WritableDeserializer.deserialize(WritableSerialization.java:42)
    at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:356)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:640)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:330)
    at org.apache.hadoop.mapred.Child.run(Child.java:268)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
    at org.ap
16/09/21 17:02:00 INFO mapred.JobClient: Job complete: job_201609151451_0044
16/09/21 17:02:00 INFO mapred.JobClient: Counters: 8
16/09/21 17:02:00 INFO mapred.JobClient:   Job Counters
16/09/21 17:02:00 INFO mapred.JobClient:     Failed map tasks=1
16/09/21 17:02:00 INFO mapred.JobClient:     Launched map tasks=48
16/09/21 17:02:00 INFO mapred.JobClient:     Data-local map tasks=13
16/09/21 17:02:00 INFO mapred.JobClient:     Rack-local map tasks=35
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps in occupied slots (ms)=343982
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces in occupied slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all maps waiting after reserving slots (ms)=0
16/09/21 17:02:00 INFO mapred.JobClient:     Total time spent by all reduces waiting after reserving slots (ms)=0

我正在使用 Accumulo table 作为我的输入数据。我的设置如下:

@Override
public int run(String[] args) throws Exception {
    Configuration conf = getConf();
    String idMapFileContent = readResourceFile(TYPE_ID_MAP_FILENAME);
    conf.set(TYPE_ID_MAP_KEY, idMapFileContent);

    Job job = Job.getInstance(conf, this.getClass().getSimpleName());
    job.setJarByClass(this.getClass());
    job.setMapperClass(DanglingLinksFinderMapper.class);
    job.setReducerClass(DanglingLinksFinderReducer.class);
    this.setupRowInputFormat(job);

    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputFormatClass(TextOutputFormat.class);
    Path out = new Path(args[0]);
    LOGGER.info("Writing to output directory: " + out.toUri());
    FileOutputFormat.setOutputPath(job, out);

    int exitCode = job.waitForCompletion(true) ? 0 : 1;
}

private Job setupRowInputFormat(Job job)
        throws IOException, AccumuloSecurityException
{
    job.setInputFormatClass(AccumuloRowInputFormat.class);
    Configuration conf = job.getConfiguration();

    AccumuloConnectInfo connectInfo = new AccumuloConnectInfo(conf);
    LOGGER.info(connectInfo.toString());

    AccumuloRowInputFormat.setZooKeeperInstance(job, connectInfo.getInstanceNames(), connectInfo.getZookeeperInstanceNames());
    AccumuloRowInputFormat.setConnectorInfo(job, connectInfo.getUserName(), connectInfo.getPassword());
    AccumuloRowInputFormat.setScanAuthorizations(job, new Authorizations());
    AccumuloRowInputFormat.setInputTableName(job, TABLE_NAME);
    return job;
}

我正在使用 Hadoop 2.6.0、Accumulo 1.5.0 和 Java 1.7。

前几天我有这个工作并且没有(据我所知)改变任何东西。所以我想这可能与我 运行 它所在的服务器上的配置或数据状态有关?该作业在我本地计算机上的 Docker 容器中的测试 table 运行 中运行良好,但在我的远程测试服务器上失败。

我可以登录 accumulo shell 并扫描我正在使用的 table。那里一切看起来都很好。我还在测试服务器上尝试了 运行 压缩,效果很好但没有解决问题。

我猜您用于启动 MapReduce 作业的 Accumulo jar 与您包含的作业本身通过 DistributedCache 或 libjars CLI 使用的版本不匹配 (Mappers/Reducers)选项。

因为您没有指定范围,AccumuloInputFormat 将自动获取您的 table 的所有 Tablet 边界,并创建与 table 中的 Tablets 相同数量的 RangeInputSplit 对象。这种拆分创建是在本地 JVM(提交作业时创建的 JVM)中完成的。这些 RangeInputSplit 对象被序列化并传递到 YARN。

您提供的错误是当 Mapper 获取这些序列化的 RangeInputSplit 对象之一并尝试反序列化它时。不知何故,这是失败的,因为没有足够的序列化数据来反序列化 Mapper 中的 Accumulo 运行 版本期望读取的内容。

这可能只是您的 Accumulo 版本中的一个序列化错误(请分享),但我不记得听说过这样的错误。我猜想本地类路径上的 Accumulo 版本和 Mapper 的类路径上存在差异。