对于 200 GB 的数据集,Last Reducer 是最近 24 小时的 运行

Last Reducer is running from last 24 hour for 200 gb of data set

您好,我有一个 mapreduce 应用程序可以将数据批量加载到 HBase 中。 我总共有 142 个文本文件,总大小为 200gb。 我的映射器在 5 分钟内完成,除了最后一个之外,所有减速器都卡在 100%。 从过去的 24 小时开始,它花费了很长时间和 运行。 我有一个专栏家庭。 我的行键如下所示。

48433197315|1972-03-31T00:00:00Z|4 48433197315|1972-03-31T00:00:00Z|38 48433197315|1972-03-31T00:00:00Z|41 48433197215|19 31T00:00:00Z|23 48433197315|1972-03-31T00:00:00Z|7 48433336118|1972-03-31T00:00:00Z|17 48433197319|1972-03-31T00:00:00Z|63194782-394 47734 03-31T00:00:00Z|58 48433197319|1972-03-31T00:00:00Z|61 48433197319|1972-03-31T00:00:00Z|73 48433197319|1972-03-31T00:00:0814Z|937 1972-03-31T00:00:00Z|7

我是这样创建我的 table 的。

  private static Configuration getHbaseConfiguration() {
    try {
        if (hbaseConf == null) {
        System.out.println(
            "UserId= " + USERID + " \t keytab file =" + KEYTAB_FILE + " \t conf =" + KRB5_CONF_FILE);
        HBaseConfiguration.create();
        hbaseConf = HBaseConfiguration.create();
        hbaseConf.set("mapreduce.job.queuename", "root.fricadev");
        hbaseConf.set("mapreduce.child.java.opts", "-Xmx6553m");
        hbaseConf.set("mapreduce.map.memory.mb", "8192");
        hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);
        System.setProperty("java.security.krb5.conf", KRB5_CONF_FILE);
        UserGroupInformation.loginUserFromKeytab(USERID, KEYTAB_FILE);
        }
    } catch (Exception e) {
        e.printStackTrace();
    }
    return hbaseConf;
    }

    /**
     * HBase bulk import example Data preparation MapReduce job driver
     * 
     * args[0]: HDFS input path args[1]: HDFS output path
     * 
     * @throws Exception
     * 
     */
    public static void main(String[] args) throws Exception {

    if (hbaseConf == null)
        hbaseConf = getHbaseConfiguration();
    String outputPath = args[2];
    hbaseConf.set("data.seperator", DATA_SEPERATOR);
    hbaseConf.set("hbase.table.name", args[0]);
    hbaseConf.setInt(MAX_FILES_PER_REGION_PER_FAMILY, 1024);

    Job job = new Job(hbaseConf);
    job.setJarByClass(HBaseBulkLoadDriver.class);
    job.setJobName("Bulk Loading HBase Table::" + args[0]);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapperClass(HBaseBulkLoadMapperUnzipped.class);

    // job.getConfiguration().set("mapreduce.job.acl-view-job",
    // "bigdata-app-fricadev-sdw-u6034690");
    if (HbaseBulkLoadMapperConstants.FUNDAMENTAL_ANALYTIC.equals(args[0])) {
        HTableDescriptor descriptor = new HTableDescriptor(Bytes.toBytes(args[0]));
        descriptor.addFamily(new HColumnDescriptor(COLUMN_FAMILY));
        HBaseAdmin admin = new HBaseAdmin(hbaseConf);
        byte[] startKey = new byte[16];
        Arrays.fill(startKey, (byte) 0);
        byte[] endKey = new byte[16];
        Arrays.fill(endKey, (byte) 255);
        admin.createTable(descriptor, startKey, endKey, REGIONS_COUNT);
        admin.close();
        // HColumnDescriptor hcd = new
        // HColumnDescriptor(COLUMN_FAMILY).setMaxVersions(1);
        // createPreSplitLoadTestTable(hbaseConf, descriptor, hcd);
    }

    job.getConfiguration().setBoolean("mapreduce.compress.map.output", true);
    job.getConfiguration().setBoolean("mapreduce.map.output.compress", true);
    job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", true);

    job.getConfiguration().setClass("mapreduce.map.output.compression.codec",
        org.apache.hadoop.io.compress.GzipCodec.class, org.apache.hadoop.io.compress.CompressionCodec.class);
    job.getConfiguration().set("hfile.compression", Compression.Algorithm.LZO.getName());

    // Connection connection =
    // ConnectionFactory.createConnection(hbaseConf);
    // Table table = connection.getTable(TableName.valueOf(args[0]));
    FileInputFormat.setInputPaths(job, args[1]);
    FileOutputFormat.setOutputPath(job, new Path(outputPath));

    job.setMapOutputValueClass(Put.class);
    HFileOutputFormat.configureIncrementalLoad(job, new HTable(hbaseConf, args[0]));

    System.exit(job.waitForCompletion(true) ? 0 : -1);

    System.out.println("job is successfull..........");

    // LoadIncrementalHFiles loader = new LoadIncrementalHFiles(hbaseConf);

    // loader.doBulkLoad(new Path(outputPath), (HTable) table);

    HBaseBulkLoad.doBulkLoad(outputPath, args[0]);

    }

    /**
     * Enum of counters.
     * It used for collect statistics
     */
    public static enum Counters {
        /**
         * Counts data format errors.
         */
        WRONG_DATA_FORMAT_COUNTER
}
}

我的代码中没有 reducer only mapper 。 我的映射器代码是这样的。

public class FundamentalAnalyticLoader implements TableLoader {

    private ImmutableBytesWritable hbaseTableName;
    private Text value;
    private Mapper<LongWritable, Text, ImmutableBytesWritable, Put>.Context context;
    private String strFileLocationAndDate;

    @SuppressWarnings("unchecked")
    public FundamentalAnalyticLoader(ImmutableBytesWritable hbaseTableName, Text value, Context context,
        String strFileLocationAndDate) {

    //System.out.println("Constructing Fundalmental Analytic Load");

    this.hbaseTableName = hbaseTableName;
    this.value = value;
    this.context = context;
    this.strFileLocationAndDate = strFileLocationAndDate;
    }

    @SuppressWarnings("deprecation")
    public void load() {
    if (!HbaseBulkLoadMapperConstants.FF_ACTION.contains(value.toString())) {

        String[] values = value.toString().split(HbaseBulkLoadMapperConstants.DATA_SEPERATOR);
        String[] strArrFileLocationAndDate = strFileLocationAndDate
            .split(HbaseBulkLoadMapperConstants.FIELD_SEPERATOR);

        if (17 == values.length) {
        String strKey = values[5].trim() + "|" + values[0].trim() + "|" + values[3].trim() + "|"
            + values[4].trim() + "|" + values[14].trim() + "|" + strArrFileLocationAndDate[0].trim() + "|"
            + strArrFileLocationAndDate[2].trim();

        //String strRowKey=StringUtils.leftPad(Integer.toString(Math.abs(strKey.hashCode() % 470)), 3, "0") + "|" + strKey;
        byte[] hashedRowKey = HbaseBulkImportUtil.getHash(strKey);
        Put put = new Put((hashedRowKey));


        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID),
            Bytes.toBytes(values[0].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE_ID),
            Bytes.toBytes(values[1].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FUNDAMENTAL_SERIES_ID_OBJECT_TYPE),
            Bytes.toBytes(values[2]));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_END_DATE),
            Bytes.toBytes(values[3].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE),
            Bytes.toBytes(values[4].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.LINE_ITEM_ID), Bytes.toBytes(values[5].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_ITEM_INSTANCE_KEY),
            Bytes.toBytes(values[6].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE), Bytes.toBytes(values[7].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_CODE),
            Bytes.toBytes(values[8].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_VALUE_CURRENCY_ID),
            Bytes.toBytes(values[9].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_IS_ESTIMATED),
            Bytes.toBytes(values[10].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_AUDITABILITY_EQUATION),
            Bytes.toBytes(values[11].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FINANCIAL_PERIOD_TYPE_ID),
            Bytes.toBytes(values[12].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_CONCEPT_ID),
            Bytes.toBytes(values[13].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.ANALYTIC_LINE_ITEM_IS_YEAR_TO_DATE),
            Bytes.toBytes(values[14].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.IS_ANNUAL), Bytes.toBytes(values[15].trim()));

        // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
        // Bytes.toBytes(HbaseBulkLoadMapperConstants.TAXONOMY_ID),
        // Bytes.toBytes(values[16].trim()));
        //
        // put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
        // Bytes.toBytes(HbaseBulkLoadMapperConstants.INSTRUMENT_ID),
        // Bytes.toBytes(values[17].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FF_ACTION),
            Bytes.toBytes(values[16].substring(0, values[16].length() - 3)));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION),
            Bytes.toBytes(strArrFileLocationAndDate[0].trim()));

        put.add(Bytes.toBytes(HbaseBulkLoadMapperConstants.COLUMN_FAMILY),
            Bytes.toBytes(HbaseBulkLoadMapperConstants.FILE_PARTITION_DATE),
            Bytes.toBytes(strArrFileLocationAndDate[2].trim()));

        try {
            context.write(hbaseTableName, put);
        } catch (IOException e) {
            context.getCounter(Counters.WRONG_DATA_FORMAT_COUNTER).increment(1);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }

        } else {

        System.out.println("Values length is less 15 and value is " + value.toString());
        }

    }
    }

非常感谢任何有助于提高速度的帮助。 计数器图像 这里`

大多数情况下,如果作业在最后一分钟或最后一秒钟挂起,那么问题可能是特定节点或资源存在并发问题等。

小检查清单可以是: 1. 用较小的数据集再试一次。这将排除代码的基本功能。 2. 由于大部分工作已经完成,mapper 和reducer 可能是好的。您可以尝试几次相同数量的作业 运行。日志可以帮助您确定同一节点是否存在重复运行的问题。 3. 验证输出是否按预期生成。 4. 您还可以减少尝试添加到 HBase 的列数。这将减轻相同体积的负载。

作业被挂起可能是由于各种问题造成的。但故障排除主要包括以上步骤 - 验证原因是否与数据相关,资源相关,特定节点相关,内存相关等

我怀疑所有记录都进入了一个区域。 当您创建空 table 时,HBase 在偶数范围内拆分键地址 space。但是因为所有实际密钥共享相同的前缀,所以它们都进入单个区域。这意味着单个 region/reduce 任务可以完成所有工作,而所有其他 regions/reduce 任务不会做任何有用的事情。您可以通过查看 Hadoop 计数器来验证这个假设:与其他 reduce 任务相比,reduce 任务减慢了多少字节 read/wrote。

如果这是问题所在,那么您需要手动准备拆分键并使用 createTable(HTableDescriptor desc, byte[][] splitKeys 创建 table。拆分键应平均划分您的实际数据集以获得最佳性能。

示例#1。如果你的键是普通的英文单词,那么很容易将 table 按第一个字符拆分成 26 个区域(拆分键是 'a', 'b', ..., 'z').或者将其按前两个字符拆分为 26*26 个区域:('aa', 'ab', ..., 'zz')。区域不一定是均匀的,但这总比只有一个区域好。

示例#2。如果您的密钥是 4 字节散列,那么很容易将 table 按第一个字节 (0x00, 0x01, ..., 0xff) 分成 256 个区域或按前两个字节分成 2^16 个区域。

在您的具体情况下,我看到两个选项:

  1. 在数据集中搜索最小键(按排序顺序)和最大键。并将它们用作 startKeyendKeyAdmin.createTable()。仅当密钥在 startKeyendKey 之间均匀分布时,这才会有效。

  2. 使用 hash(key) 为您的密钥添加前缀并使用示例 #2 中的方法。这应该运行良好,但您将无法进行语义查询,例如 (KEY >= ${first} and KEY <= ${last}).