hbase mapreduce部分未运行

3htmauhk  于 2021-06-08  发布在  Hbase
关注(0)|答案(0)|浏览(246)

我正在ubuntu 17.04机器上独立运行hbase-1.2.4。我正在尝试用java编写一个map reduce作业,它提取元数据(即列族标识符后跟列标识符)并汇总共享同一模式的记录数。我找到了很多例子,并从中复制了一些代码http://www.informit.com/articles/article.aspx?p=2262143&seqnum=2 这篇文章也处理了一个类似的问题http://sujee.net/2011/04/10/hbase-map-reduce-example/ 似乎很有帮助。我的自定义代码编译并运行,但是我的测试显示reducer没有运行,最后我没有得到任何结果。我附上代码,其中包含一些评论,指出关键的地方,也有一些试验。

我希望,有人能给我一些必要的提示

代码

import java.io.*;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.fs.Path;
//
import java.util.ArrayList;
import java.util.List;
// 
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

/**
 * ermittelt die Anzahl gleich strukturierter Records in einer Tabelle
 * 

**/

public class MetaSummary {

        static class Mapper1 extends TableMapper<Text, IntWritable> {
        // wenn moeglich, spaeter Tabellenname als Parameter übergeben (String tableName)
        private int numRecords = 0;
        private static final IntWritable one = new IntWritable(1);
        // eingefuegt MU
        private String l_row ="";
        private String l_family;
        private String l_qualifier;
        private String l_out ="";
        private byte[] l_bytearray;
        private Text l_text;
        private String l_mapout ="";

        @Override
        public void map(ImmutableBytesWritable row, Result values, Context context) throws IOException {
            // für jeden Record den Aufbau Spaltenfamile:Spalte als String
            // zusammengesetzt erzeugen ergibt den Output-Key des Mappers
            // Der zugehörige Wert ist jeweils 1.
            // Die Werte für gleiche Keys sollen nachher in der Reduce Phase addiert werden
            // Aufgabe, dem userKey den String zuweisen, der den Aufbau beschreibt
            //
            // the user key is composed of the column famliy identifiers along with the respective column names
                 l_out="";
                 for(KeyValue kv : values.raw()){
                    l_family = new String(kv.getFamily());
                    l_qualifier = new String(kv.getQualifier());
                    l_out = l_out+l_family+":";
                    if (l_qualifier == null){ l_qualifier = "<null>"; }
                    if (l_qualifier.equals("")){ l_qualifier = "<leer>"; }
                    l_out = l_out +l_qualifier + " ";
                 }
                 l_out = l_out.trim();
                 l_mapout = l_mapout+ l_out + " ";
                 l_text = new Text(l_mapout);
                 // following code for test reasons only, to check if this part was running
                 try (PrintStream out = new PrintStream(new FileOutputStream("mapout.txt"))) {
                       out.print(l_mapout); }

               try {
                 //context.write(l_userkey, one);  // former trials
                 // context.write(l_out, one);
                 context.write(l_text, one);
                } 
              catch (InterruptedException e) {
                throw new IOException(e);
            }
           }
          }

    static class Reducer1 extends TableReducer<ImmutableBytesWritable, IntWritable, ImmutableBytesWritable>
    //public static class Reducer1 extends TableReducer<Text, IntWritable, ImmutableBytesWritable>
    {
        //public void reduce(String key, Iterable<IntWritable> values, Context context)
        public void reduce(Text key, Iterable<IntWritable> values, Context context)
                throws IOException, InterruptedException {
            int sum = 0;
            // following code for test reasons only, to check if this part was running
            try (PrintStream out = new PrintStream(new FileOutputStream("red1Anfang.txt"))) {
                  out.print("in Reducer1.reduce before for ..."); }
            for (IntWritable val : values) {
                sum += val.get();
                System.out.println(sum);
            Put put = new Put(key.getBytes());
            // Put put = new Put(Bytes.toBytes(key.toString()));   // former trials
            //      addColumn(byte[] family, byte[] qualifier, byte[] value) 
            put.addColumn(Bytes.toBytes("details"), Bytes.toBytes("total"), Bytes.toBytes(sum));
            context.write(new ImmutableBytesWritable(key.getBytes()), put);
        }
    }
    }

   // the Reducer1 did not yiels any results, so the next trial was to output into the file systems
   // which should be done by Reducer2
   // which anyway does not yield any results

    static class Reducer2 extends Reducer<Text, IntWritable, Text, IntWritable>
    {                
        /*public Reducer2() {
        }*/

        public void reduce(Text key, Iterable<IntWritable> values, Context context) 
           throws IOException, InterruptedException {
            // following code for test reasons only, to check if this part was running
            try (PrintStream out = new PrintStream(new FileOutputStream("red2Anfang.txt"))) {
                       out.print("in Reducer2.reduce Anfang"); }
            // following code for test reasons only, to check if this part was running
            try (PrintStream out = new PrintStream(new FileOutputStream("redlaeuft.txt"))) {
                       out.print("reduce läuft"); }
            String sumstr="";
            int sum = 0;
            // following code for test reasons only, to check if this part was running
            try (PrintStream out = new PrintStream(new FileOutputStream("redoutvorfor.txt"))) {
                       out.print("in Reducer2.reduce vor Schleife"); }
            for (IntWritable val : values) {
                sum += val.get();
            // the following lines for test reasons only
            sumstr = new Integer(sum).toString();
            try (PrintStream out = new PrintStream(new FileOutputStream("redout.txt"))) {
                       out.print(key.getBytes() + " " + sumstr); }
            // Write out the key and the sum   --- which of the following should do?
            // context.write( new ImmutableBytesWritable(key.getBytes()), new IntWritable( sum ) );
            //context.write( key, new IntWritable( sum ) );
            // Even the simplest output does not work
            context.write (new Text("abc"), new IntWritable(1));
        }
      }
    }

    public static void main(String[] args) throws Exception {

//        HBaseConfiguration conf = new HBaseConfiguration();  // trial 1
        Configuration conf = HBaseConfiguration.create();
        Path output=new Path("Output");
//        Job job = new Job(conf, "HBase_MetaSummary");   // trial 1

        Job job = Job.getInstance(conf, "HBase_MetaSummary");
        job.setJarByClass(MetaSummary.class);

        Scan scan = new Scan();

        TableMapReduceUtil.initTableMapperJob("videodaten", scan, Mapper1.class, ImmutableBytesWritable.class,
                IntWritable.class, job);

//       job.setMapperClass(Mapper1.class);   // does not change anything
        job.setReducerClass(Reducer2.class);
        // the following outcommented lines should have caused the reduce results to be writen to a HBase table
        // precondition: a table was created before :: create values'meta_summary', {NAME=>'details',VERSIONS=>1)
        //TableMapReduceUtil.initTableReducerJob("meta_summary", Reducer1.class, job);
        // instead I try to write into a text file which should do as well
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        job.setOutputFormatClass( TextOutputFormat.class );
        job.setNumReduceTasks( 1 );
        FileOutputFormat.setOutputPath(job, output);

        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
}

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题