我有一个服务需要扫描许多前缀在hbase中,我使用 MultiRowRangeFilter
要做到这一点,第一个单线程版本似乎真的很慢(我相信大部分时间都花在扫描操作上)。所以我写了一个多线程版本。但它似乎没有提高任何性能。响应时间几乎与单线程版本相同。我想知道为什么多线程没有效果,还有没有其他方法可以减少响应时间。
单线程版本:
public <T> Stream<Pair<String,T>> batchScanWithSingleThread(String tableName, List<String> prefixes, RowMapper<T> rowMapper){
Scan scan=new Scan();
List<MultiRowRangeFilter.RowRange> rowRanges=new ArrayList<>();
for (String prefix : prefixes) {
rowRanges.add(new MultiRowRangeFilter.RowRange(prefix,true,Bytes.toString(HBaseUtils.calculateTheClosestNextRowKeyForPrefix(prefix.getBytes())),false));
}
scan.setFilter(new MultiRowRangeFilter(rowRanges));
return hbaseTemplate.getStream(tableName,scan,
result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)));
//this method do the actual scan operation and convert it to a stream.
}
多线程版本:
基本上每10个前缀,它就会启动一个新线程来扫描它。等待所有结果返回。
public <T> Stream<Pair<String,T>> batchScanWithMultiThread(String tableName, List<String> prefixes, RowMapper<T> rowMapper){
List<Future<Stream<Pair<String,T>>>> futures=new ArrayList<>();
List<MultiRowRangeFilter.RowRange> rowRanges=new ArrayList<>(maxPrefixNumber);
for (String prefix:prefixes) {
rowRanges.add(new MultiRowRangeFilter.RowRange(prefix,true,
Bytes.toString(HBaseUtils.calculateTheClosestNextRowKeyForPrefix(prefix.getBytes())),false));
if (rowRanges.size()>=10){
Scan scan=new Scan();
scan.setFilter(new MultiRowRangeFilter(rowRanges));
scan.setCaching(caching);
futures.add(executorService.submit(
()-> hbaseTemplate.getStream(tableName,scan, result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)))
));
rowRanges=new ArrayList<>(10);
}
}
if (!rowRanges.isEmpty()){
Scan scan=new Scan();
scan.setCaching(caching);
scan.setFilter(new MultiRowRangeFilter(rowRanges));
futures.add(executorService.submit(
()-> hbaseTemplate.getStream(tableName,scan,result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)))
));
}
List<Stream<Pair<String, T>>> streams=new ArrayList<>();
for (Future<Stream<Pair<String, T>>> future : futures) {
try {
streams.add(future.get());
} catch (InterruptedException | ExecutionException e) {
log.error("concurrent execution exception",e);
}
}
return streams.stream().flatMap(Function.identity());
}
暂无答案!
目前还没有任何答案,快来回答吧!