如何不因一个数据库插入失败而使 Hadoop MapReduce 作业失败?

How not to fail Hadoop MapReduce job for one database insert failure?

我正在编写一个 MapReduce 作业来挖掘网络服务器日志。输入来自文本文件,输出进入 MySQL 数据库。问题是,如果一条记录无法插入,无论出于何种原因,例如数据超出列大小,整个作业都会失败,并且不会向数据库写入任何内容。有没有什么办法可以让好的记录一直保留下来呢?我想一种方法是验证数据,但这对我的口味来说将客户端与数据库模式结合得太多了。 我没有发布代码,因为这不是一个特别的代码问题。

编辑:

减速器:

protected void reduce(SkippableLogRecord rec,
        Iterable<NullWritable> values, Context context) {
    String path = rec.getPath().toString();
    path = path.substring(0, min(path.length(), 100));

    try {
        context.write(new DBRecord(rec), NullWritable.get());

        LOGGER.info("Wrote record {}.", path);
    } catch (IOException | InterruptedException e) {
        LOGGER.error("There was a problem when writing out {}.", path, e);
    }
}

日志:

15/03/01 14:35:06 WARN mapred.LocalJobRunner: job_local279539641_0001
java.lang.Exception: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
    at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:462)
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:529)
Caused by: java.io.IOException: Data truncation: Data too long for column 'filename' at row 1
    at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.close(DBOutputFormat.java:103)
    at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.close(ReduceTask.java:550)
    at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:629)
    at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:319)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
15/03/01 14:35:06 INFO mapred.LocalJobRunner: reduce > reduce
15/03/01 14:35:07 INFO mapreduce.Job: Job job_local279539641_0001 failed with state FAILED due to: NA

回答我自己的问题并查看 this SO post,我看到数据库写入是成批完成的,在 SQLException,事务被回滚。所以这解释了我的问题。我想我只需要使数据库列足够大,或者先进行验证。我也可以创建一个自定义 DBOutputFormat/DBRecordWriter 但除非我一次插入一条记录,否则总会有一条错误记录导致整个批次回滚的风险。

public void close(TaskAttemptContext context) throws IOException {
  try {
      LOG.warn("Executing statement:" + statement);   

      statement.executeBatch();
    connection.commit();
  } catch (SQLException e) {
    try {
      connection.rollback();
    }
    catch (SQLException ex) {
      LOG.warn(StringUtils.stringifyException(ex));
    }
    throw new IOException(e.getMessage());
  } finally {
    try {
      statement.close();
      connection.close();
    }
    catch (SQLException ex) {
      throw new IOException(ex.getMessage());
    }
  }
}