hbase mapside join-其中一个表未被读取?从hbase读取结果并将其正确地输入hbase

jaxagkaj  于 2021-05-29  发布在  Hadoop
关注(0)|答案(1)|浏览(344)

我正在尝试对hbase中的两个表进行mapside连接。我的目标是在hashmap中保存小表的记录,并与大表进行比较,一旦匹配,就在hbase中的表中再次写入记录。我使用mapper和reducer为连接操作编写了类似的代码,效果很好,两个表都在mapper类中扫描。但是由于reduce-side连接根本没有效率,所以我只想在mapper-side中连接表。在下面的代码中,“commented if block”只是看到它总是返回false,并且第一个表(小表)没有被读取。如有任何帮助,我们将不胜感激。我用的是hdp沙盒。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
//import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper.Context;
import org.apache.hadoop.util.Tool;
import com.sun.tools.javac.util.Log;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapred.TableOutputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableSplit;

public class JoinDriver extends Configured implements Tool {

    static int row_index = 0;

        public static class JoinJobMapper extends TableMapper<ImmutableBytesWritable, Put> {
        private static byte[] big_table_bytarr = Bytes.toBytes("big_table");
        private static byte[] small_table_bytarr = Bytes.toBytes("small_table");

        HashMap<String,String> myHashMap = new HashMap<String, String>();

        byte[] c1_value;
        byte[] c2_value;

        String big_table;
        String small_table;

        String big_table_c1;
        String big_table_c2; 

        String small_table_c1; 
        String small_table_c2; 

        Text mapperKeyS;
        Text mapperValueS; 
        Text mapperKeyB;
        Text mapperValueB; 

        public void map(ImmutableBytesWritable rowKey, Result columns, Context context) {
            TableSplit currentSplit = (TableSplit) context.getInputSplit();
            byte[] tableName = currentSplit.getTableName();

            try {
                Put put = new Put(Bytes.toBytes(++row_index));

                // put small table into hashmap - myhashMap
                if (Arrays.equals(tableName, small_table_bytarr)) {

                    c1_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c1"));
                    c2_value = columns.getValue(Bytes.toBytes("s_cf"), Bytes.toBytes("s_cf_c2"));
                    small_table_c1 = new String(c1_value);
                    small_table_c2 = new String(c2_value);

                    mapperKeyS = new Text(small_table_c1);
                    mapperValueS = new Text(small_table_c2);

                    myHashMap.put(small_table_c1,small_table_c2);

                } else if (Arrays.equals(tableName, big_table_bytarr)) {
                    c1_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c1"));
                    c2_value = columns.getValue(Bytes.toBytes("b_cf"), Bytes.toBytes("b_cf_c2"));
                    big_table_c1 = new String(c1_value);
                    big_table_c2 = new String(c2_value);

                    mapperKeyB = new Text(big_table_c1);
                    mapperValueB = new Text(big_table_c2);

            //  if (set.containsKey(big_table_c1)){

                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c1"), Bytes.toBytes(big_table_c1));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c2"), Bytes.toBytes(big_table_c2));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );
                    put.addColumn(Bytes.toBytes("join"), Bytes.toBytes("join_c3"),Bytes.toBytes((myHashMap.get(big_table_c1))));
                    context.write(new ImmutableBytesWritable(mapperKeyB.getBytes()), put );

            //      }

                }

            } catch (Exception e) {
                // TODO : exception handling logic
                e.printStackTrace();
            }
        }

    }

    public int run(String[] args) throws Exception {

        List<Scan> scans = new ArrayList<Scan>();

        Scan scan1 = new Scan();
        scan1.setAttribute("scan.attributes.table.name", Bytes.toBytes("small_table"));
        System.out.println(scan1.getAttribute("scan.attributes.table.name"));
        scans.add(scan1);

        Scan scan2 = new Scan();
        scan2.setAttribute("scan.attributes.table.name", Bytes.toBytes("big_table"));
        System.out.println(scan2.getAttribute("scan.attributes.table.name"));
        scans.add(scan2);

        Configuration conf = new Configuration();
        Job job = new Job(conf);
        job.setJar("MSJJ.jar");
        job.setJarByClass(JoinDriver.class);

        TableMapReduceUtil.initTableMapperJob(scans, JoinJobMapper.class, ImmutableBytesWritable.class, Put.class, job);
        TableMapReduceUtil.initTableReducerJob("joined_table", null, job);
        job.setNumReduceTasks(0);

        job.waitForCompletion(true);

        return 0;
    }

    public static void main(String[] args) throws Exception {
        JoinDriver runJob = new JoinDriver();
        runJob.run(args);

    }

}
nzkunb0c

nzkunb0c1#

通过阅读您的问题陈述,我相信您对使用多个hbase表输入有一些错误的认识。我建议您在mapper类的setup方法中加载hashmap中的小表。然后在大表上使用map only job,在map方法中可以从之前加载的hashmap中获取相应的值。告诉我这是怎么回事。

相关问题