Bulk Load to HBase: ERROR : java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
Bulk Load to HBase: ERROR : java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
在 Java 中,我必须使用 MapReduce 将一些数据从 tsv 文件(约 21*10^6 行)导入到 HBase table。
每行是:
XYZ|XZS YSY|SDS|XDA|JKX|SDS 0.XXXXXXXXX
HTable 有 5 个列族:A,B,C,D,E
文件每一行的前几行是我的 HBase rowkey。
第二组五个是5列限定符:
- YSY|SDS|XDA|JKX|SDS -> 用于列族 A
- YSY|SDS|XDA|JKX -> 列族 B
- YSY|SDS|XDA -> 用于列族 C
- YSY|SDS -> 列族 D
- YSY -> 列族E
最后一个是要插入到单元格中的值。
我还必须用总和 Σ 聚合具有相同限定符(1 或 2 或 3 或 4 或 5)的所有值(这将是我的 Reducer 的一部分)。
这是我的driver:
public class Driver {
private static final String COLUMN_FAMILY_1 = "A";
private static final String COLUMN_FAMILY_2 = "B";
private static final String COLUMN_FAMILY_3 = "C";
private static final String COLUMN_FAMILY_4 = "D";
private static final String COLUMN_FAMILY_5 = "E";
private static final String TABLENAME = "abe:data";
private static final String DATA_SEPARATOR = "\t";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
//Configuration Settings about hbase table
configuration.set("hbase.table.name", TABLENAME);
configuration.set("colomn_family_1", COLUMN_FAMILY_1);
configuration.set("colomn_family_2", COLUMN_FAMILY_2);
configuration.set("colomn_family_3", COLUMN_FAMILY_3);
configuration.set("colomn_family_4", COLUMN_FAMILY_4);
configuration.set("colomn_family_5", COLUMN_FAMILY_5);
configuration.set("data_separator", DATA_SEPARATOR);
if (args.length!= 2){
System.out.println("Usage: ");
System.out.println("-\t args[0] -> HDFS input path");
System.err.println("-\r args[1] -> HDFS output path ");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
Job job = null;
try {
job = Job.getInstance(configuration);
} catch (IOException e) {
System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
e.printStackTrace();
}
job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
job.setJarByClass(Driver.class);
//MAPPER
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MappingClass.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
try {
FileInputFormat.addInputPath(job, inputHdfsPath);
} catch (IllegalArgumentException | IOException e) {
System.out.println("Error setting inputPath in FileInputFormat");
e.printStackTrace();
}
try {
FileSystem.get(configuration).delete(outputHdfsPath, true);
} catch (IllegalArgumentException | IOException e) {
System.out.println("");
e.printStackTrace();
}
//Setting output FileSystem.Path to save HFile to bulkImport
FileOutputFormat.setOutputPath(job, outputHdfsPath);
FileSystem hdfs;
//Deleting output folder if exists
try {
hdfs = FileSystem.get(configuration);
if(hdfs.exists(outputHdfsPath)){
hdfs.delete(outputHdfsPath, true); //Delete existing Directory
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
//Variables to access to HBase
Connection hbCon = ConnectionFactory.createConnection(configuration);
Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
Admin admin = hbCon.getAdmin();
HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
// Wait for HFiles creations
boolean result = job.waitForCompletion(true);
LoadIncrementalHFiles loadFfiles = null;
try {
loadFfiles = new LoadIncrementalHFiles(configuration);
} catch (Exception e) {
System.out.println("Error configuring LoadIncrementalHFiles.");
e.printStackTrace();
}
if (result){
loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
System.out.println("Bulk Import Completed.");
}
else {
System.out.println("Error in completing job. No bulkimport.");
}
}
}
我的 Mapper 是:
public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
private String separator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
separator = configuration.get("data_separator");
}
@Override
public void map(LongWritable key,Text line,Context context){
String[] values = line.toString().split(separator);
String rowkey = values[0];
String[] allQualifiers = values[1].split("\|");
String percentage = values[2];
System.out.println(percentage);
String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
String toQ5 = new String(allQualifiers[0]);
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
这是我的 Reducer:
public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
private String columnFamily_1;
private String columnFamily_2;
private String columnFamily_3;
private String columnFamily_4;
private String columnFamily_5;
private float sum;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
columnFamily_1 = configuration.get("colomn_family_1");
columnFamily_2 = configuration.get("colomn_family_2");
columnFamily_3 = configuration.get("colomn_family_3");
columnFamily_4 = configuration.get("colomn_family_4");
columnFamily_5 = configuration.get("colomn_family_5");
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
String[] rk_cq = key.toString().split("_");
String rowkey = rk_cq[0];
String cq = rk_cq[1];
String colFamily = this.getFamily(cq);
sum = 0;
for(FloatWritable fw : values)
sum += fw.get();
ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;
try {
context.write(ibw, kv);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private String getFamily(String cq){
String cf = new String();
switch (cq.split("\|").length) {
case 1:
cf = columnFamily_1;
break;
case 2:
cf = columnFamily_2;
break;
case 3:
cf = columnFamily_3;
break;
case 4:
cf = columnFamily_4;
break;
case 5:
cf = columnFamily_5;
break;
default:
break;
}
return cf;
}
}
现在错误:
17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.write(HFileOutputFormat2.java:167)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
感谢您的帮助。
我修好了。在驱动程序中我忘记了:
job.setReducerClass(ReducingClass.class);
在 Java 中,我必须使用 MapReduce 将一些数据从 tsv 文件(约 21*10^6 行)导入到 HBase table。
每行是:
XYZ|XZS YSY|SDS|XDA|JKX|SDS 0.XXXXXXXXX
HTable 有 5 个列族:A,B,C,D,E
文件每一行的前几行是我的 HBase rowkey。
第二组五个是5列限定符:
- YSY|SDS|XDA|JKX|SDS -> 用于列族 A
- YSY|SDS|XDA|JKX -> 列族 B
- YSY|SDS|XDA -> 用于列族 C
- YSY|SDS -> 列族 D
- YSY -> 列族E
最后一个是要插入到单元格中的值。 我还必须用总和 Σ 聚合具有相同限定符(1 或 2 或 3 或 4 或 5)的所有值(这将是我的 Reducer 的一部分)。
这是我的driver:
public class Driver {
private static final String COLUMN_FAMILY_1 = "A";
private static final String COLUMN_FAMILY_2 = "B";
private static final String COLUMN_FAMILY_3 = "C";
private static final String COLUMN_FAMILY_4 = "D";
private static final String COLUMN_FAMILY_5 = "E";
private static final String TABLENAME = "abe:data";
private static final String DATA_SEPARATOR = "\t";
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration configuration = HBaseConfiguration.create();
//Configuration Settings about hbase table
configuration.set("hbase.table.name", TABLENAME);
configuration.set("colomn_family_1", COLUMN_FAMILY_1);
configuration.set("colomn_family_2", COLUMN_FAMILY_2);
configuration.set("colomn_family_3", COLUMN_FAMILY_3);
configuration.set("colomn_family_4", COLUMN_FAMILY_4);
configuration.set("colomn_family_5", COLUMN_FAMILY_5);
configuration.set("data_separator", DATA_SEPARATOR);
if (args.length!= 2){
System.out.println("Usage: ");
System.out.println("-\t args[0] -> HDFS input path");
System.err.println("-\r args[1] -> HDFS output path ");
System.exit(1);
}
String inputPath = args[0];
String outputPath = args[1];
Path inputHdfsPath = new Path(inputPath);
Path outputHdfsPath = new Path(outputPath);
Job job = null;
try {
job = Job.getInstance(configuration);
} catch (IOException e) {
System.out.println("\n\t--->Exception: Error trying getinstance of job.<---\n");
e.printStackTrace();
}
job.setJobName("Bulk Loading HBase Table: "+ "\""+ TABLENAME+"\" with aggregation.");
job.setJarByClass(Driver.class);
//MAPPER
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(MappingClass.class);
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
job.setMapOutputValueClass(FloatWritable.class);
try {
FileInputFormat.addInputPath(job, inputHdfsPath);
} catch (IllegalArgumentException | IOException e) {
System.out.println("Error setting inputPath in FileInputFormat");
e.printStackTrace();
}
try {
FileSystem.get(configuration).delete(outputHdfsPath, true);
} catch (IllegalArgumentException | IOException e) {
System.out.println("");
e.printStackTrace();
}
//Setting output FileSystem.Path to save HFile to bulkImport
FileOutputFormat.setOutputPath(job, outputHdfsPath);
FileSystem hdfs;
//Deleting output folder if exists
try {
hdfs = FileSystem.get(configuration);
if(hdfs.exists(outputHdfsPath)){
hdfs.delete(outputHdfsPath, true); //Delete existing Directory
}
} catch (IllegalArgumentException | IOException e) {
e.printStackTrace();
}
//Variables to access to HBase
Connection hbCon = ConnectionFactory.createConnection(configuration);
Table hTable = hbCon.getTable(TableName.valueOf(TABLENAME));
RegionLocator regionLocator = hbCon.getRegionLocator(TableName.valueOf(TABLENAME));
Admin admin = hbCon.getAdmin();
HFileOutputFormat2.configureIncrementalLoad(job, hTable, regionLocator);
// Wait for HFiles creations
boolean result = job.waitForCompletion(true);
LoadIncrementalHFiles loadFfiles = null;
try {
loadFfiles = new LoadIncrementalHFiles(configuration);
} catch (Exception e) {
System.out.println("Error configuring LoadIncrementalHFiles.");
e.printStackTrace();
}
if (result){
loadFfiles.doBulkLoad(outputHdfsPath, admin, hTable, regionLocator);
System.out.println("Bulk Import Completed.");
}
else {
System.out.println("Error in completing job. No bulkimport.");
}
}
}
我的 Mapper 是:
public class MappingClass extends Mapper<LongWritable,Text,ImmutableBytesWritable,FloatWritable>{
private String separator;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
separator = configuration.get("data_separator");
}
@Override
public void map(LongWritable key,Text line,Context context){
String[] values = line.toString().split(separator);
String rowkey = values[0];
String[] allQualifiers = values[1].split("\|");
String percentage = values[2];
System.out.println(percentage);
String toQ1 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]+"|"+allQualifiers[4]);
String toQ2= new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]+"|"+allQualifiers[3]);
String toQ3 = new String(allQualifiers[0]+"|"+allQualifiers[1]+"|"+allQualifiers[2]);
String toQ4 = new String(allQualifiers[0]+"|"+allQualifiers[1]);
String toQ5 = new String(allQualifiers[0]);
ImmutableBytesWritable ibw = new ImmutableBytesWritable();
FloatWritable valueOut = new FloatWritable(Float.parseFloat(percentage));
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ1)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ2)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ3)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ4)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
ibw.set(Bytes.toBytes(new String(rowkey+"_"+toQ5)));
try {
context.write(ibw, valueOut);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
}
这是我的 Reducer:
public class ReducingClass extends Reducer<ImmutableBytesWritable, FloatWritable, ImmutableBytesWritable, KeyValue> {
private String columnFamily_1;
private String columnFamily_2;
private String columnFamily_3;
private String columnFamily_4;
private String columnFamily_5;
private float sum;
@Override
protected void setup(Context context) throws IOException, InterruptedException {
Configuration configuration = context.getConfiguration();
columnFamily_1 = configuration.get("colomn_family_1");
columnFamily_2 = configuration.get("colomn_family_2");
columnFamily_3 = configuration.get("colomn_family_3");
columnFamily_4 = configuration.get("colomn_family_4");
columnFamily_5 = configuration.get("colomn_family_5");
}
@Override
public void reduce(ImmutableBytesWritable key, Iterable<FloatWritable> values, Context context){
String[] rk_cq = key.toString().split("_");
String rowkey = rk_cq[0];
String cq = rk_cq[1];
String colFamily = this.getFamily(cq);
sum = 0;
for(FloatWritable fw : values)
sum += fw.get();
ImmutableBytesWritable ibw = new ImmutableBytesWritable(rowkey.getBytes());
KeyValue kv = new KeyValue(rowkey.getBytes(), colFamily.getBytes(), cq.getBytes(), Float.toString(sum).getBytes());;
try {
context.write(ibw, kv);
} catch (IOException | InterruptedException e) {
e.printStackTrace();
}
}
private String getFamily(String cq){
String cf = new String();
switch (cq.split("\|").length) {
case 1:
cf = columnFamily_1;
break;
case 2:
cf = columnFamily_2;
break;
case 3:
cf = columnFamily_3;
break;
case 4:
cf = columnFamily_4;
break;
case 5:
cf = columnFamily_5;
break;
default:
break;
}
return cf;
}
}
现在错误:
17/05/08 20:04:22 INFO mapreduce.Job: map 100% reduce 29%
17/05/08 20:04:22 INFO mapreduce.Job: Task Id : attempt_1485334922305_5537_r_000000_2, Status : FAILED
Error: java.lang.ClassCastException: org.apache.hadoop.io.FloatWritable cannot be cast to org.apache.hadoop.hbase.Cell
at org.apache.hadoop.hbase.mapreduce.HFileOutputFormat2.write(HFileOutputFormat2.java:167)
at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
at org.apache.hadoop.mapreduce.Reducer.reduce(Reducer.java:150)
at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
at org.apache.hadoop.mapred.YarnChild.run(YarnChild.java:168)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1657)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:162)
感谢您的帮助。
我修好了。在驱动程序中我忘记了:
job.setReducerClass(ReducingClass.class);