扩展hbase put以避免add方法中的原始行签入

wz3gfoph  于 2021-06-07  发布在  Hbase
关注(0)|答案(0)|浏览(438)

hbase需要从一个集群导出数据并导入到另一个集群,只需对row key稍加修改
正如我在上面的文章中提到的,需要从一个集群导出表的hbase数据,并根据我们的匹配模式通过更改行键将其导入另一个集群
在“org.apache.hadoop.hbase.mapreduce.import”中,我们可以使用参数“hbase\u importer\u rename\u cfs”来更改columnfamily
我稍微修改了导入代码以支持行键更改。我的代码在pastebin中提供https://pastebin.com/ticgebb0
使用下面的代码更改了行键。

private static Cell convertRowKv(Cell kv, Map<byte[], byte[]> rowkeyReplaceMap) {
        if (rowkeyReplaceMap != null) {
            byte[] oldrowkeyName = CellUtil.cloneRow(kv);
            String oldrowkey = Bytes.toString(oldrowkeyName);
            Set<byte[]> keys = rowkeyReplaceMap.keySet();
            for (byte[] key : keys) {
                if (oldrowkey.contains(Bytes.toString(key))) {
                    byte[] newrowkeyName = rowkeyReplaceMap.get(key);
                    ByteBuffer buffer = ByteBuffer.wrap(oldrowkeyName);
                    buffer.get(key);
                    ByteBuffer newbuffer = buffer.slice();
                    ByteBuffer bb = ByteBuffer.allocate(newrowkeyName.length + newbuffer.capacity());
                    byte[] newrowkey = bb.array();
                    kv = new KeyValue(newrowkey, // row buffer
                            0, // row offset
                            newrowkey.length, // row length
                            kv.getFamilyArray(), // CF buffer
                            kv.getFamilyOffset(), // CF offset
                            kv.getFamilyLength(), // CF length
                            kv.getQualifierArray(), // qualifier buffer
                            kv.getQualifierOffset(), // qualifier offset
                            kv.getQualifierLength(), // qualifier length
                            kv.getTimestamp(), // timestamp
                            KeyValue.Type.codeToType(kv.getTypeByte()), // KV
                                                                        // Type
                            kv.getValueArray(), // value buffer
                            kv.getValueOffset(), // value offset
                            kv.getValueLength()); // value length
                }
            }
        }
        return kv;
    }

已执行导入

hbase org.apache.hadoop.hbase.mapreduce.ImportWithRowKeyChange -DHBASE_IMPORTER_RENAME_ROW=123:123456 import file:///home/nshsh/export/

行键已成功更改。但是,当将单元格放入hbase表中时,使用“org.apache.hadoop.hbase.client.put.add(cell)”可以检查为
“kv的行与put相同,因为我们正在更改行键”
在这里它失败了。
然后我对check-in-put类进行了注解,并更新了hbase-client.jar。我也试着写hbaseput,它扩展了put

public class HBasePut extends Put {

    public HBasePut(byte[] row) {
        super(row);
        // TODO Auto-generated constructor stub
    }

    public Put add(Cell kv) throws IOException{
        byte [] family = CellUtil.cloneFamily(kv);
       System.err.print(Bytes.toString(family));
        List<Cell> list = getCellList(family);
        //Checking that the row of the kv is the same as the put
        /*int res = Bytes.compareTo(this.row, 0, row.length,
            kv.getRowArray(), kv.getRowOffset(), kv.getRowLength());
        if (res != 0) {
          throw new WrongRowIOException("The row in " + kv.toString() +
            " doesn't match the original one " +  Bytes.toStringBinary(this.row));
        }*/
        list.add(kv);
        familyMap.put(family, list);
        return this;
      }

}

在mapreduce中,任务总是失败,出现以下异常

2020-07-24 13:37:15,105 WARN  [htable-pool1-t1] hbase.HBaseConfiguration: Config option "hbase.regionserver.lease.period" is deprecated. Instead, use "hbase.client.scanner.timeout.period"
2020-07-24 13:37:15,122 INFO  [LocalJobRunner Map Task Executor #0] client.AsyncProcess: , tableName=import
2020-07-24 13:37:15,178 INFO  [htable-pool1-t1] client.AsyncProcess: #2, table=import, attempt=18/35 failed=7ops, last exception: org.apache.hadoop.hbase.client.WrongRowIOException: org.apache.hadoop.hbase.client.WrongRowIOException: The row in \x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00/vfrt:con/1589541180643/Put/vlen=225448/seqid=0 doesn't match the original one 123_abcf
    at org.apache.hadoop.hbase.client.Put.add(Put.java:330)
    at org.apache.hadoop.hbase.protobuf.ProtobufUtil.toPut(ProtobufUtil.java:574)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doBatchOp(RSRpcServices.java:744)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.doNonAtomicRegionMutation(RSRpcServices.java:720)
    at org.apache.hadoop.hbase.regionserver.RSRpcServices.multi(RSRpcServices.java:2168)
    at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$2.callBlockingMethod(ClientProtos.java:33656)
    at org.apache.hadoop.hbase.ipc.RpcServer.call(RpcServer.java:2196)
    at org.apache.hadoop.hbase.ipc.CallRunner.run(CallRunner.java:112)
    at org.apache.hadoop.hbase.ipc.RpcExecutor.consumerLoop(RpcExecutor.java:133)
    at org.apache.hadoop.hbase.ipc.RpcExecutor$1.run(RpcExecutor.java:108)
    at java.lang.Thread.run(Thread.java:745)

我不知道任务中提到了旧的put类。
有人能帮我修一下吗。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题