使用多线程来提高扫描性能似乎不起作用

hm2xizp9  于 2021-07-13  发布在  Hbase
关注(0)|答案(0)|浏览(277)

我有一个服务需要扫描许多前缀在hbase中,我使用 MultiRowRangeFilter 要做到这一点,第一个单线程版本似乎真的很慢(我相信大部分时间都花在扫描操作上)。所以我写了一个多线程版本。但它似乎没有提高任何性能。响应时间几乎与单线程版本相同。我想知道为什么多线程没有效果,还有没有其他方法可以减少响应时间。
单线程版本:

public <T> Stream<Pair<String,T>> batchScanWithSingleThread(String tableName, List<String> prefixes, RowMapper<T> rowMapper){
        Scan scan=new Scan();
        List<MultiRowRangeFilter.RowRange> rowRanges=new ArrayList<>();
        for (String prefix : prefixes) {
            rowRanges.add(new MultiRowRangeFilter.RowRange(prefix,true,Bytes.toString(HBaseUtils.calculateTheClosestNextRowKeyForPrefix(prefix.getBytes())),false));
        }
        scan.setFilter(new MultiRowRangeFilter(rowRanges));
        return hbaseTemplate.getStream(tableName,scan,  
                result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)));
        //this method do the actual scan operation and convert it to a stream.
    }

多线程版本:
基本上每10个前缀,它就会启动一个新线程来扫描它。等待所有结果返回。

public <T> Stream<Pair<String,T>> batchScanWithMultiThread(String tableName, List<String> prefixes, RowMapper<T> rowMapper){
        List<Future<Stream<Pair<String,T>>>> futures=new ArrayList<>();
        List<MultiRowRangeFilter.RowRange> rowRanges=new ArrayList<>(maxPrefixNumber);
        for (String prefix:prefixes) {
            rowRanges.add(new MultiRowRangeFilter.RowRange(prefix,true,
                    Bytes.toString(HBaseUtils.calculateTheClosestNextRowKeyForPrefix(prefix.getBytes())),false));
            if (rowRanges.size()>=10){
                Scan scan=new Scan();
                scan.setFilter(new MultiRowRangeFilter(rowRanges));
                scan.setCaching(caching);
                futures.add(executorService.submit(
                        ()-> hbaseTemplate.getStream(tableName,scan, result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)))
                        ));
                rowRanges=new ArrayList<>(10);
            }
        }
        if (!rowRanges.isEmpty()){
            Scan scan=new Scan();
            scan.setCaching(caching);
            scan.setFilter(new MultiRowRangeFilter(rowRanges));
            futures.add(executorService.submit(
                    ()-> hbaseTemplate.getStream(tableName,scan,result -> Pair.with(Bytes.toString(result.getRow()),rowMapper.mapRow(result)))
            ));
        }
        List<Stream<Pair<String, T>>> streams=new ArrayList<>();
        for (Future<Stream<Pair<String, T>>> future : futures) {
            try {
                streams.add(future.get());
            } catch (InterruptedException | ExecutionException e) {
                log.error("concurrent execution exception",e);
            }
        }
        return streams.stream().flatMap(Function.identity());
    }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题