我将hbase 0.94.8与hdfs一起使用。我实现了协处理器来求和值。该表只有两行
hbase(主):043:0>扫描“演示”
行-列+单元格
行1列=info:category,时间戳=1375438808010,值=web
行1列=info:hits,时间戳=1375438797824,值=123
行2列=info:category,时间戳=1375438834518,值=邮件
行2列=info:hits,时间戳=1375438822093,值=1321
hbase(main):043:0>描述“演示”
'demo',{method=>'table_att',协处理器$1=>'| org.apache.hadoop.hbase.coprocess true
或.aggregateimplementation | |'},{name=>'info',data_block_encoding=>'none',bloo mfilter=>'none',replication_scope=>'0',versions=>'3',compression=>'none',
最小版本=>'0',ttl=>'2147483647',保留删除的单元格=>'false',块大小=>'65536',在内存中=>'false',在磁盘上编码=>'true',块缓存=>'true'}1行,0.0670秒
我的代码如下:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.coprocessor.AggregationClient;
import org.apache.hadoop.hbase.client.coprocessor.LongColumnInterpreter;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.coprocessor.ColumnInterpreter;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
public class webAggregator {
// private static final byte[] EDRP_FAMILY = Bytes.toBytes("EDRP");
// private static final byte[] EDRP_QUALIFIER = Bytes.toBytes("advanceKWh");
public static void testSumWithValidRange(Configuration conf,
String[] otherArgs) throws Throwable {
byte[] EDRP_TABLE = Bytes.toBytes(otherArgs[0]);
byte[] EDRP_FAMILY = Bytes.toBytes(otherArgs[1]);
byte[] EDRP_QUALIFIER = Bytes.toBytes(otherArgs[2]);
conf.set("hbase.zookeeper.quorum", "master");
conf.set("hbase.zookeeper.property.clientPort", "2222");
conf.setLong("hbase.rpc.timeout", 600000);
conf.setLong("hbase.client.scanner.caching", 1000);
conf.set(CoprocessorHost.REGION_COPROCESSOR_CONF_KEY,
"org.apache.hadoop.hbase.coprocessor.AggregateImplementation");
// Utility.CreateHBaseTable(conf, otherArgs[1], otherArgs[2], true);
/*HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor desc = new HTableDescriptor(EDRP_TABLE);
desc.addFamily(new HColumnDescriptor(EDRP_FAMILY));
admin.createTable(desc);*/
AggregationClient aClient = new AggregationClient(conf);
Scan scan = new Scan();
scan.addColumn(EDRP_FAMILY, EDRP_QUALIFIER);
HTable table = new HTable(conf, "demo");
Scan s = new Scan();
ResultScanner ss = table.getScanner(s);
for(Result r:ss){
for(KeyValue kv : r.raw()){
System.out.print(new String(kv.getRow()) + " ");
System.out.print(new String(kv.getFamily()) + ":");
System.out.print(new String(kv.getQualifier()) + " ");
System.out.print(kv.getTimestamp() + " ");
System.out.println(new String(kv.getValue()));
}
}
final ColumnInterpreter<Long, Long> ci = new LongColumnInterpreter();
long sum = aClient.sum(Bytes.toBytes(otherArgs[0]), ci, scan);
System.out.println(sum);
}
/**
* Main entry point.
*
* @param argsThe
* command line parameters.
* @throws Exception
* When running the job fails.
*/
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs ={"demo","info","hits"};
try {
testSumWithValidRange(conf, otherArgs);
} catch (Throwable e) {
e.printStackTrace();
}
} }
我的堆栈跟踪如下:
webaggregator.testsumwithvalidrange(webaggregator。java:62)在webaggregator.main(webaggregator。java:79)
请帮忙。
2条答案
按热度按时间fiei3ece1#
我和你犯了同样的错误。经过调查,我发现问题是我的列类型是integer,所以longcolumnprogrator.getvalue方法返回null。
从你的代码和结果来看,我确信你的info:hits'列是字符串列,但不是长列。
只需考虑将hits改为真正的long列,从hbase shell来看,它的值应该是这样的
或者您可以自己编写一个列解释器来处理字符串值和。
zysjyyx42#
问题是关于数据类型。
B. put.addColumn(Bytes.toBytes("objects"), Bytes.toBytes("info"), Bytes.toBytes("1.0"));
A. column=objects:info, timestamp=1525942759312, value=?\xF0\x00\x00\x00\x00\x00\x00
B. column=objects:info, timestamp=1525941054901, value=1.0