我是hadoop和hbase的新手。让我举例说明我的问题。为了简洁起见,数据被缩小了。
假设我们有一个名为item.log的文件,它包含以下信息。
ITEM-1,PRODUCT-1
ITEM-2,PRODUCT-1
ITEM-3,PRODUCT-2
ITEM-4,PRODUCT-2
ITEM-5,PRODUCT-3
ITEM-6,PRODUCT-1
ITEM-7,PRODUCT-1
ITEM-8,PRODUCT-2
ITEM-9,PRODUCT-1
我有一个Map代码如下,
package org.sanjus.hadoop;
import java.io.IOException;
import java.util.Iterator;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.MapReduceBase;
import org.apache.hadoop.mapred.Mapper;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reducer;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
public class ProductMapReduce {
public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, LongWritable> {
public void map(LongWritable key, Text value, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
String[] columns = value.toString().split(",");
if (columns.length != 2) {
System.out.println("Bad line/value " + value);
return;
}
Text word = new Text(columns[1]);
LongWritable counter = new LongWritable(1L);
output.collect(word, counter);
}
}
public static class Reduce extends MapReduceBase implements Reducer<Text, LongWritable, Text, LongWritable> {
public void reduce(Text key, Iterator<LongWritable> iterator, OutputCollector<Text, LongWritable> output, Reporter reporter) throws IOException {
long sum = 0L;
while (iterator.hasNext()) {
sum += iterator.next().get();
}
output.collect(key, new LongWritable(sum));
}
}
public static void main(String[] args) throws IOException {
JobConf conf = new JobConf(ProductMapReduce.class);
conf.setJobName("Product Analyzer");
conf.setOutputKeyClass(Text.class);
conf.setOutputValueClass(LongWritable.class);
conf.setMapperClass(Map.class);
conf.setCombinerClass(Reduce.class);
conf.setReducerClass(Reduce.class);
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
FileInputFormat.setInputPaths(conf, new Path(args[0]));
FileOutputFormat.setOutputPath(conf, new Path(args[1]));
JobClient.runJob(conf);
}
}
标签1:map reduce后的输出如下:
PRODUCT-1 5
PRODUCT-2 3
PRODUCT-3 1
这里有一个问题:
我在hbase中有一个表,其中包含以下信息。
PRODUCT-1 10$
PRODUCT-2 20$
PRODUCT-3 30$
问题/要求:我希望reduce阶段的输出是“label 1:”和上述hbase表中reduce输出的合并
PRODUCT-1 10$ * 5 = 50$
PRODUCT-2 20$ * 3 = 60$
PRODUCT-3 30$ * 1 = 30$
基本上,key是product-1,hbase表中这个key的值是10$,reducer中同一个key的值是5,两个值相乘。($符号是用来理解的)
注意:我在中找到的示例基于hbase的输入或输出。我的场景是,输入和输出将是hdfs中的一个文件,而我需要用hbase表中的信息处理reducer输出。
2条答案
按热度按时间bmp9r5qi1#
我就是这么做的,
在我的reducer类中,我添加了重载方法'setup'
使用htable.get api,我得到了结果对象。
ulmd4ohb2#
由于hbase支持高读吞吐量,并且您只希望读取reducer中的数据(将使用受控数量的数据):因此可以使用hbase api根据reducer的键从表中读取数据。由于hbase中的读取速度很快(取决于提取的数据大小,大约10毫秒),因此我认为您的性能不会受到影响。只要确保在reducer的configure()方法中初始化configuration&htable。