在hbase中,resultscanner和inittablemapperjob之间的扫描有什么区别

xvw2m8pv  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(299)

我希望有人能告诉我这两个api调用之间的区别。我在他们俩之间得到了奇怪的结果。对于hbase客户端/hbase服务器版本1.0.1和1.2.0-cdh5.7.2,会出现这种情况。
首先,我的行键的格式是hash\u name\u timestamp,例如100\u servername\u 1234567890。hbase表的ttl为30天,因此压缩后超过30天的内容应该消失。
下面是使用resultscanner的代码。它不使用mapreduce,所以需要很长时间才能完成。我不能这样做,因为这太费时了。但是,出于调试目的,我对这个方法没有任何问题。它列出了指定时间范围内的所有密钥,我认为这些密钥有效,因为返回的密钥的所有时间戳都在过去30天内并且在指定的时间范围内:

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType));
scan.setCaching(500);
scan.setCacheBlocks(false);
scan.setTimeRange(start, end);

Connection fConnection = ConnectionFactory.createConnection(conf);
Table table = fConnection.getTable(TableName.valueOf(tableName));
ResultScanner scanner = table.getScanner(scan);
for (Result result = scanner.next(); result != null; result = scanner.next()) {
   System.out.println("Found row: " + Bytes.toString(result.getRow()));
}

下面的代码不起作用,但它使用了mapreduce,它比使用resultscanner方式运行得快,因为它将事物划分为1200个Map。问题是,由于ttl过期,我得到了应该消失的行键:

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("raw_data"), Bytes.toBytes(fileType));
scan.setCaching(500);
scan.setCacheBlocks(false);
scan.setTimeRange(start, end);
TableMapReduceUtil.initTableMapperJob(tableName, scan, MTTRMapper.class, Text.class, IntWritable.class, job);

这是我得到的一个错误,它最终杀死了整个mr作业,因为超过25%的Map器失败了。
错误:org.apache.hadoop.hbase.client.retriesHaustedException:尝试后失败=36,异常:wed jun 28 13:46:57 pdt 2017,null,java.net.sockettimeoutexception:calltimeout=120000,callduration=120301:行'65\u app129041.iad1.mydomain.com\u 1476641940'位于region=server\u-based\u data的表'server\u-based\u data'
我将尝试研究hbase客户机和hbase服务器jar的代码,但希望有人能马上知道方法之间的区别以及是什么导致inittablemapperjob调用失败。
编辑:以下是我正在使用的表的说明:

describe 'server_based_data'
Table server_based_data is ENABLED                                              
server_based_data                                                               
COLUMN FAMILIES DESCRIPTION                                                     
{NAME => 'raw_data', DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'ROW', REPLIC
ATION_SCOPE => '0', VERSIONS => '1', COMPRESSION => 'SNAPPY', MIN_VERSIONS => '0
', TTL => '2592000 SECONDS (30 DAYS)', KEEP_DELETED_CELLS => 'FALSE', BLOCKSIZE 
=> '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}                         
1 row(s) in 0.5180 seconds

这是我的Map程序代码:

public void map(ImmutableBytesWritable rowkey, Result columns, Context context) throws IOException, InterruptedException {
    Configuration conf = context.getConfiguration();
    startMS = conf.getLong("startTime", 0);
    endMS = conf.getLong("endTime", 1);
    System.out.println(startMS);
    System.out.println(endMS);

    // extract the ci as the key from the rowkey
    String pattern = "\\d*_(\\S*)_(\\d{10})";
    String ciname = null;
    Pattern r = Pattern.compile(pattern);
    String strRowKey = Bytes.toString(rowkey.get());
    // check the time here to see if we count it or not in the counts

    Matcher m = r.matcher(strRowKey);
    long ts = 0;

    if (m.find()) {
        ts = Long.valueOf(m.group(2)).longValue();
        ciname = m.group(1);
        if ((ts >= startMS) && (ts <= endMS)) {
            context.write(new Text(ciname), ONE);           
        }           
    }       
}

我仍然认为inittablemapperjob方法有问题,因为我上面发布的错误显示了一行的时间戳,该行应该已经从表的ttl过期,但是出于某种原因,inittablemapperjob仍然找到它并尝试查找它,但是超时了,而resultscanner由于某种原因没有看到它。

pxy2qtax

pxy2qtax1#

我想提几点建议。
看一下spark on hbase示例,特别是如何使用spark执行扫描。它是用scala编写的,但您可以用java实现。即使在这种情况下,它也比mapreduce代码简洁得多。
如果只需要行键,请添加相应的筛选器,如firstkeyonlyfilter。这将减少从hbase检索到的不必要数据量。
我不知道是什么原因导致了这个奇怪的行为 initTableMapperJob . 但我希望以上建议会有用。

相关问题