拆分期间的 IOException java.util.concurrent.ExecutionException:java.io.FileNotFoundException 将 HFile 加载到 HBase 时

IOException during splitting java.util.concurrent.ExecutionException: java.io.FileNotFoundException when loading HFile to HBase

我正在尝试使用盐渍 table 方法将数据批量加载到 hbase 中,如本网站所述:https://www.opencore.com/blog/2016/10/efficient-bulk-load-of-hbase-using-spark/。虽然我能够插入数据但在随机时间我得到

ERROR mapreduce.LoadIncrementalHFiles: IOException during splitting java.util.concurrent.ExecutionException: java.io.FileNotFoundException: File does not exist: /user//hfile/transactions/transaction_data/b1c6c47856104db0a1289c2b7234d1d7

我正在使用 hbase-client 1.2.0 和 hbase-server 1.2.0 作为依赖项,并使用 HFileOutputFormat2 class 编写我的 HFile。

我试过使用作者的补丁 HFileOutputFormat2 https://github.com/gbif/maps/commit/ee4e0001486f3e8b37b034c5b05fc8c8d4e76ab9 但这将无法一起写入 HFile。

下面是写入HFile并加载到hbase的代码部分

dates.map { x =>
      val hbase_connection2 = ConnectionFactory.createConnection(hbaseconfig)
      val table2 = hbase_connection2.getTable(TableName.valueOf(hbase_table))
      val regionLoc2 = hbase_connection2.getRegionLocator(table2.getName)
      val admin2 = hbase_connection2.getAdmin

      val transactionsDF = sql(hive_query_2.replace("|columns|", hive_columns) + " " + period_clause.replace("|date|", date_format.format(x)))

      val max_records = transactionsDF.count()

      val max_page = math.ceil(max_records.toDouble/page_limit.toDouble).toInt

      val start_row = 0
      val end_row = page_limit.toInt

      val start_page = if(date_format.format(x).equals(bookmark_date)) {
        bookmarked_page
      }
      else{
        0
      }

      val pages = start_page to (if(max_records < page_limit.toInt){max_page-1}else{max_page})
if(max_records > 0) {
pages.map (page => {
          val sourceDF = transactionsDF
            .withColumn("tanggal", cnvrt_tanggal(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
            .withColumn("jam", cnvrt_jam(transactionsDF.col("ctry_cd"), transactionsDF.col("ori_tanggal"), transactionsDF.col("ori_jam")))
            .join(locations, transactionsDF.col("wsid") === locations.col("key"), "left_outer")
            .join(trandescdictionary, lower(transactionsDF.col("source_system")) === lower(trandescdictionary.col("FLAG_TRANS")) && lower(transactionsDF.col("trans_cd")) === lower(trandescdictionary.col("TRAN_CODE")), "left_outer")
            .filter(transactionsDF.col("rowid").between((start_row + (page * page_limit.toInt)).toString, ((end_row + (page * page_limit.toInt)) - 1).toString))
            .withColumn("uuid", timeUUID())
            .withColumn("created_dt", current_timestamp())

          val spp = new SaltPrefixPartitioner(hbase_regions)

          val saltedRDD = sourceDF.rdd.flatMap(r => {
            Seq((salt(r.getString(r.fieldIndex("uuid")), hbase_regions), Seq(r.get(r.fieldIndex(new String(cols(0).toLowerCase))), r.get(r.fieldIndex(new String(cols(1).toLowerCase))), r.get(r.fieldIndex(new String(cols(2).toLowerCase))), r.get(r.fieldIndex(new String(cols(3).toLowerCase))), r.get(r.fieldIndex(new String(cols(4).toLowerCase))), r.get(r.fieldIndex(new String(cols(5).toLowerCase))), r.get(r.fieldIndex(new String(cols(6).toLowerCase))), r.get(r.fieldIndex(new String(cols(7).toLowerCase))), r.get(r.fieldIndex(new String(cols(8).toLowerCase))), r.get(r.fieldIndex(new String(cols(9).toLowerCase))), r.get(r.fieldIndex(new String(cols(10).toLowerCase))), r.get(r.fieldIndex(new String(cols(11).toLowerCase))), r.get(r.fieldIndex(new String(cols(12).toLowerCase))), r.get(r.fieldIndex(new String(cols(13).toLowerCase))), r.get(r.fieldIndex(new String(cols(14).toLowerCase))), r.get(r.fieldIndex(new String(cols(15).toLowerCase))), r.get(r.fieldIndex(new String(cols(16).toLowerCase))), r.get(r.fieldIndex(new String(cols(17).toLowerCase))), r.get(r.fieldIndex(new String(cols(18).toLowerCase))), r.get(r.fieldIndex(new String(cols(19).toLowerCase))), r.get(r.fieldIndex(new String(cols(20).toLowerCase))), r.get(r.fieldIndex(new String(cols(21).toLowerCase))), r.get(r.fieldIndex(new String(cols(22).toLowerCase))))))
          })

          val partitionedRDD = saltedRDD.repartitionAndSortWithinPartitions(spp)

          val cells = partitionedRDD.flatMap(r => {
            val salted_keys = r._1
            val colFamily = hbase_colfamily

            Seq(
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(0).getBytes(), Bytes.toBytes(Option(r._2(0)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(1).getBytes(), Bytes.toBytes(Option(r._2(1)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(2).getBytes(), Bytes.toBytes(Option(r._2(2)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(3).getBytes(), Bytes.toBytes(Option(r._2(3)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(4).getBytes(), Bytes.toBytes(Option(r._2(4)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(5).getBytes(), Bytes.toBytes(Option(r._2(5)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(6).getBytes(), Bytes.toBytes(Option(r._2(6)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(7).getBytes(), Bytes.toBytes(Option(r._2(7)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(8).getBytes(), Bytes.toBytes(Option(r._2(8)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(9).getBytes(), Bytes.toBytes(Option(r._2(9)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(10).getBytes(), Bytes.toBytes(Option(r._2(10)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(11).getBytes(), Bytes.toBytes(Option(r._2(11)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(12).getBytes(), Bytes.toBytes(Option(r._2(12)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(13).getBytes(), Bytes.toBytes(Option(r._2(13)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(14).getBytes(), Bytes.toBytes(Option(r._2(14)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(15).getBytes(), Bytes.toBytes(Option(r._2(15)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(16).getBytes(), Bytes.toBytes(Option(r._2(16)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(17).getBytes(), Bytes.toBytes(Option(r._2(17)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(18).getBytes(), Bytes.toBytes(Option(r._2(18)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(19).getBytes(), Bytes.toBytes(Option(r._2(19)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(20).getBytes(), Bytes.toBytes(Option(r._2(20)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(21).getBytes(), Bytes.toBytes(Option(r._2(21)).getOrElse("").toString))),
              (new ImmutableBytesWritable(Bytes.toBytes(salted_keys)), new KeyValue(Bytes.toBytes(salted_keys), colFamily.getBytes(), cols(22).getBytes(), Bytes.toBytes(Option(r._2(22)).getOrElse("").toString)))
            )
          })

          val job = Job.getInstance(hbaseconfig, "Insert Transaction Data Row " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString + " for " + x.toString)
          HFileOutputFormat2.configureIncrementalLoad(job, table2, regionLoc2)

          val conf = job.getConfiguration

          if (fs.exists(path)) {
            fs.delete(path, true)

            cells.saveAsNewAPIHadoopFile(
              path.toString,
              classOf[ImmutableBytesWritable],
              classOf[KeyValue],
              classOf[HFileOutputFormat2],
              conf
            )
          }
          else if (!fs.exists(path)) {
            cells.saveAsNewAPIHadoopFile(
              path.toString,
              classOf[ImmutableBytesWritable],
              classOf[KeyValue],
              classOf[HFileOutputFormat2],
              conf
            )
          }

          val bulk_loader = new LoadIncrementalHFiles(conf)
          bulk_loader.doBulkLoad(path, admin2, table2, regionLoc2)

          conf.clear()
          println("Done For " + x.toString + " pages " + (start_row + (page * page_limit.toInt)).toString + " to " + ((end_row + (page * page_limit.toInt)) - 1).toString)

          if (fs.exists(bookmark)) {
            fs.delete(bookmark, true)
            Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
          }
          else {
            Seq((date_format.format(x), page.toString)).map(r => (r._1, r._2)).toDF("Date", "Page").write.format("com.databricks.spark.csv").option("delimiter", "|").save(bookmark_path)
          }
          0
        })
  }
  hbase_connection2.close()
  0
}

我真的束手无策,因为我无法追踪导致此错误的原因。我希望有人能给我一些想法,说明这个文件拆分错误的原因。

我想你可能会看到这个: https://issues.apache.org/jira/projects/HBASE/issues/HBASE-21183

这是我偶尔看到的,所以我们一直没有解决。请问您多久看一次?

HBASE-3871 似乎您必须对数据进行并行化和重新分区是解决此问题的方法。

see this code which is the orgin for the error

private Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> groupOrSplitPhase(
      AsyncClusterConnection conn, TableName tableName, ExecutorService pool,
      Deque<LoadQueueItem> queue, List<Pair<byte[], byte[]>> startEndKeys) throws IOException {
    // <region start key, LQI> need synchronized only within this scope of this
    // phase because of the puts that happen in futures.
    Multimap<ByteBuffer, LoadQueueItem> rgs = HashMultimap.create();
    final Multimap<ByteBuffer, LoadQueueItem> regionGroups = Multimaps.synchronizedMultimap(rgs);
    Set<String> missingHFiles = new HashSet<>();
    Pair<Multimap<ByteBuffer, LoadQueueItem>, Set<String>> pair =
      new Pair<>(regionGroups, missingHFiles);

    // drain LQIs and figure out bulk load groups
    Set<Future<Pair<List<LoadQueueItem>, String>>> splittingFutures = new HashSet<>();
    while (!queue.isEmpty()) {
      final LoadQueueItem item = queue.remove();

      final Callable<Pair<List<LoadQueueItem>, String>> call =
        new Callable<Pair<List<LoadQueueItem>, String>>() {
          @Override
          public Pair<List<LoadQueueItem>, String> call() throws Exception {
            Pair<List<LoadQueueItem>, String> splits =
              groupOrSplit(conn, tableName, regionGroups, item, startEndKeys);
            return splits;
          }
        };
      splittingFutures.add(pool.submit(call));
    }
    // get all the results. All grouping and splitting must finish before
    // we can attempt the atomic loads.
    for (Future<Pair<List<LoadQueueItem>, String>> lqis : splittingFutures) {
      try {
        Pair<List<LoadQueueItem>, String> splits = lqis.get();
        if (splits != null) {
          if (splits.getFirst() != null) {
            queue.addAll(splits.getFirst());
          } else {
            missingHFiles.add(splits.getSecond());
          }
        }
      } catch (ExecutionException e1) {
        Throwable t = e1.getCause();
        if (t instanceof IOException) {
          LOG.error("IOException during splitting", e1);
          throw (IOException) t; // would have been thrown if not parallelized,
        }
        LOG.error("Unexpected execution exception during splitting", e1);
        throw new IllegalStateException(t);
      } catch (InterruptedException e1) {
        LOG.error("Unexpected interrupted exception during splitting", e1);
        throw (InterruptedIOException) new InterruptedIOException().initCause(e1);
      }
    }
    return pair;
  }