Hbase mapside join-其中一张表没有被读取?从 hbase 中读取正确的结果到 hbase

Hbase mapside join- One of the tables is not getting read? read from hbase and right result into hbase

我正在尝试对位于 Hbase 中的两个 table 进行映射连接。我的目的是在hashmap中保留小table的记录,并与大table进行比较,一旦匹配,再次将记录写入hbase中的table。我使用 Mapper 和 Reducer 编写了类似的连接操作代码,它运行良好,两个 table 都在映射器 class 中被扫描。但是由于 reduce side join 根本没有效率,我只想在 mapper 端加入 tables。在下面的代码中 "commented if block" 只是为了看到它 returns 始终为 false 并且第一个 table (小的)没有被读取。任何提示帮助表示赞赏。我正在使用 HDP 的沙箱。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
//import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.Tool;
import com.sun.tools.javac.util.Log;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;

public class JoinDriver extends Configured implements Tool {

    static int row_index = 0;

        public static class JoinJobMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private static byte[] big_table_bytarr = Bytes.toBytes("big_table");
        private static byte[] small_table_bytarr = Bytes.toBytes("small_table");

        HashMap<String,String> myHashMap = new HashMap<String, String>();

        byte[] c1_value;
        byte[] c2_value;

        String big_table;
        String small_table;

        String big_table_c1;
        String big_table_c2; 

        String small_table_c1; 
        String small_table_c2; 

        Text mapperKeyS;
        Text mapperValueS; 
        Text mapperKeyB;
        Text mapperValueB; 

        public void map(ImmutableBytesWritable rowKey, Result columns, Context context) {
            TableSplit currentSplit = (TableSplit) context.getInputSplit();
            byte[] tableName = currentSplit.getTableName();

            try {
                Put put = new Put(Bytes.toBytes(++row_index));


                // put small table into hashmap - myhashMap
                if (Arrays.equals(tableName, small_table_bytarr)) {

                    c1_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c1"));
                    c2_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c2"));
                    small_table_c1 = new String(c1_value);
                    small_table_c2 = new String(c2_value);

                    mapperKeyS = new Text(small_table_c1);
                    mapperValueS = new Text(small_table_c2);

                    myHashMap.put(small_table_c1,small_table_c2);


                } else if (Arrays.equals(tableName, big_table_bytarr)) {
                    c1_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c1"));
                    c2_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c2"));
                    big_table_c1 = new String(c1_value);
                    big_table_c2 = new String(c2_value);

                    mapperKeyB = new Text(big_table_c1);
                    mapperValueB = new Text(big_table_c2);



            //  if (set.containsKey(big_table_c1)){

                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c1"), Bytes.toBytes(big_table_c1));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c2"), Bytes.toBytes(big_table_c2));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c3"),Bytes.toBytes((myHashMap.get(big_table_c1))));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );

            //      }

                }

            } catch (Exception e) {
                // TODO : exception handling logic
                e.printStackTrace();
            }
        }

    }

    public int run(String[] args) throws Exception {

        List<Scan> scans = new ArrayList<Scan>();



        Scan scan1 = new Scan();
        scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("small_table"));
        System.out.println(scan1.getAttribute("scan.attributes.table.name"));
        scans.add(scan1);

        Scan scan2 = new Scan();
        scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("big_table"));
        System.out.println(scan2.getAttribute("scan.attributes.table.name"));
        scans.add(scan2);

        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJar("MSJJ.jar");
        job.setJarByClass(JoinDriver.class);

        TableMapReduceUtil.initTableMapperJob(scans, JoinJobMapper.class, ImmutableBytesWritable.class, Put.class, job);
        TableMapReduceUtil.initTableReducerJob("joined_table", null, job);
        job.setNumReduceTasks(0);


        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        JoinDriver runJob = new JoinDriver();
        runJob.run(args);

    }

}

通过阅读您的问题陈述,我相信您对使用多个 HBase table 输入有一些错误的想法。 我建议你在 HashMap 中加载 small table,在映射器 class 的设置方法中。然后在 big table 上使用 map only 作业,在 map 方法中你可以从你之前加载的 HashMap 中获取相应的值。 让我知道结果如何。