Bulk Load to HBase: ERROR : java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell

Bulk Load to HBase: ERROR : java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell

在 Java 中,我必须使用 MapReduce 将一些数据从 tsv 文件(约 21*10^6 行)导入到 HBase table。
每行是:
XYZ|XZS YSY|SDS|XDA|JKX|SDS 0.XXXXXXXXX
HTable 有 5 个列族:A,B,C,D,E

文件每一行的前几行是我的 HBase rowkey。

第二组五个是5列限定符:

  1. YSY|SDS|XDA|JKX|SDS -> 用于列族 A
  2. YSY|SDS|XDA|JKX -> 列族 B
  3. YSY|SDS|XDA -> 用于列族 C
  4. YSY|SDS -> 列族 D
  5. YSY -> 列族E

最后一个是要插入到单元格中的值。 我还必须用总和 Σ 聚合具有相同限定符(1 或 2 或 3 或 4 或 5)的所有值(这将是我的 Reducer 的一部分)。

这是我的driver:

public class Driver {

    private static final String COLUMN_FAMILY_1 = "A";
    private static final String COLUMN_FAMILY_2 = "B";
    private static final String COLUMN_FAMILY_3 = "C";
    private static final String COLUMN_FAMILY_4 = "D";
    private static final String COLUMN_FAMILY_5 = "E";
    private static final String TABLENAME = "abe:data";
    private static final String DATA_SEPARATOR = "\t";


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = HBaseConfiguration.create();

        //Configuration Settings about hbase table
        configuration.set("hbase.table.name", TABLENAME);
        configuration.set("colomn_family_1", COLUMN_FAMILY_1);
        configuration.set("colomn_family_2", COLUMN_FAMILY_2);
        configuration.set("colomn_family_3", COLUMN_FAMILY_3);
        configuration.set("colomn_family_4", COLUMN_FAMILY_4);
        configuration.set("colomn_family_5", COLUMN_FAMILY_5);
        configuration.set("data_separator", DATA_SEPARATOR);


        if (args.length!= 2){
            System.out.println("Usage: ");
            System.out.println("-\t args[0] -> HDFS input path");
            System.err.println("-\r args[1] -> HDFS output path ");
            System.exit(1);
        }

        String inputPath = args[0];
        String outputPath = args[1];
        Path inputHdfsPath = new Path(inputPath);
        Path outputHdfsPath = new Path(outputPath);

        Job job = null;

        try {
            job = Job.getInstance(configuration);
        } catch (IOException e) {
            System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
            e.printStackTrace();
        }

        job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
        job.setJarByClass(Driver.class);

        //MAPPER
        job.setInputFormatClass(TextInputFormat.class);
        job.setMapperClass(MappingClass.class);
        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
        job.setMapOutputValueClass(FloatWritable.class);

        try {

            FileInputFormat.addInputPath(job, inputHdfsPath);

        } catch (IllegalArgumentException | IOException e) {
            System.out.println("Error setting inputPath in FileInputFormat");
            e.printStackTrace();
        } 

        try {

            FileSystem.get(configuration).delete(outputHdfsPath, true);

        } catch (IllegalArgumentException | IOException e) {
            System.out.println("");
            e.printStackTrace();
        }

        //Setting output FileSystem.Path to save HFile to bulkImport
        FileOutputFormat.setOutputPath(job, outputHdfsPath);        
        FileSystem hdfs;


        //Deleting output folder if exists 
        try {

            hdfs = FileSystem.get(configuration);
            if(hdfs.exists(outputHdfsPath)){
                hdfs.delete(outputHdfsPath, true); //Delete existing Directory
            }

        } catch (IllegalArgumentException | IOException e) {
            e.printStackTrace();
        }


        //Variables to access to HBase
        Connection hbCon = ConnectionFactory.createConnection(configuration);
        Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
        RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
        Admin admin = hbCon.getAdmin();
        HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);

        // Wait for HFiles creations
        boolean result =  job.waitForCompletion(true);
        LoadIncrementalHFiles loadFfiles = null;

        try {
            loadFfiles = new LoadIncrementalHFiles(configuration);
        } catch (Exception e) {
            System.out.println("Error configuring LoadIncrementalHFiles.");
            e.printStackTrace();
        }

        if (result){
            loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
            System.out.println("Bulk Import Completed.");
        }
        else {
            System.out.println("Error in completing job. No bulkimport.");
        }

    }

    }   

我的 Mapper 是:

    public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
        private String separator;


        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration configuration = context.getConfiguration();
            separator = configuration.get("data_separator");
        }

        @Override
        public void map(LongWritable key,Text line,Context context){

            String[] values = line.toString().split(separator);
            String rowkey = values[0];
            String[] allQualifiers = values[1].split("\|");
            String percentage = values[2];
            System.out.println(percentage);

            String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
            String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
            String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
            String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
            String toQ5 = new String(allQualifiers[0]);


            ImmutableBytesWritable ibw = new ImmutableBytesWritable();
            FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));

            ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));

            try {
                context.write(ibw, valueOut);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }


            ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));

            try {
                context.write(ibw, valueOut);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }


            ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));

            try {
                context.write(ibw, valueOut);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }


            ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));

            try {
                context.write(ibw, valueOut);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }


            ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));

            try {
                context.write(ibw, valueOut);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }

        }

    }

这是我的 Reducer:

 public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
        private String columnFamily_1;
        private String columnFamily_2;
        private String columnFamily_3;
        private String columnFamily_4;
        private String columnFamily_5;
        private float sum;

        @Override
        protected void setup(Context context) throws IOException, InterruptedException {
            Configuration configuration = context.getConfiguration();

            columnFamily_1 = configuration.get("colomn_family_1");
            columnFamily_2 = configuration.get("colomn_family_2");
            columnFamily_3 = configuration.get("colomn_family_3");
            columnFamily_4 = configuration.get("colomn_family_4");
            columnFamily_5 = configuration.get("colomn_family_5");
        }
        @Override
        public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
            String[] rk_cq = key.toString().split("_");
            String rowkey = rk_cq[0];
            String cq = rk_cq[1];
            String colFamily = this.getFamily(cq);          
            sum = 0;

            for(FloatWritable fw : values)
                sum += fw.get();

            ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
            KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;


            try {
                context.write(ibw, kv);
            } catch (IOException | InterruptedException e) {
                e.printStackTrace();
            }

        }

        private String getFamily(String cq){
            String cf = new String();

            switch (cq.split("\|").length) {
            case 1:
                cf = columnFamily_1;
                break;

            case 2:
                cf = columnFamily_2;
                break;

            case 3:
                cf = columnFamily_3;
                break;

            case 4:
                cf = columnFamily_4;
                break;

            case 5:
                cf = columnFamily_5;
                break;

            default:
                break;
            }

            return cf;
        }

    }

现在错误:

17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
 at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.write(HFileOutputFormat2.java:167)
 at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
 at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
 at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
 at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
 at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
 at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
 at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
 at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)
 at java.security.AccessController.doPrivileged(Native Method)
 at javax.security.auth.Subject.doAs(Subject.java:422)
 at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
 at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)

感谢您的帮助。

我修好了。在驱动程序中我忘记了:

job.setReducerClass(ReducingClass.class);