更新hbase中整列的值

mv1qrgav  于 2021-05-29  发布在  Hadoop
关注(0)|答案(2)|浏览(328)

我有一个hbase表,所有行都有一个特定的列值

901877853087813636      column=metadata:collection-id, timestamp=1514594631532, value=1007

现在,如何将表中所有行的值从1007更改为1008。
所有帮助都指向修改特定行。
请帮帮我

xzv2uavs

xzv2uavs1#

使用singlecolumnvaluefilter扫描表以获取值为1007的所有行,然后可以使用bulk put为所有这些行放入新值(1008)。例如,要按如下方式扫描put筛选器:

SingleColumnValueFilter singleColumnValueFilter = new SingleColumnValueFilter("metadata".getBytes(),
               "collection-id".getBytes(),CompareOp.EQUAL,
               new BinaryComparator(Bytes.toBytes(1007)));
qq24tv8q

qq24tv8q2#

hbase支持仅基于rowkey更新记录。因此,我们必须获取所有需要更新的记录,并创建自己的批处理put来更新这些记录,如下所示。
updateallrows[tablename][columnqualifier][columnfamily][oldvalue][newvalue]

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;

public class UpdateAllRows {

    private static Connection conn;
    public static Admin getConnection() throws IOException {
        if (conn == null) {
            conn = ConnectionFactory.createConnection(HBaseConfiguration.create());
        }
        return conn.getAdmin();
    }

    public static void main(String args[]) throws Exception {
        getConnection();
        updateTable(args[0], args[1], args[2], args[3], args[4]);
    }

    public static void updateTable(String tableName, String columnFamily, String columnQualifier, String oldValue, String newValue) throws Exception{
        Table table = conn.getTable(TableName.valueOf(tableName));
        ResultScanner rs = scan(tableName, columnFamily, columnQualifier);

        byte[] cfBytes = Bytes.toBytes(columnFamily);
        byte[] cqBytes = Bytes.toBytes(columnQualifier);
        byte[] oldvalBytes = Bytes.toBytes(oldValue);
        byte[] newvalBytes = Bytes.toBytes(newValue);

        Result res = null;
        List<Put> putList = new ArrayList<Put>();
        try {
            while ((res = rs.next()) != null) {
                if (Arrays.equals(res.getValue(cfBytes, cqBytes), oldvalBytes)){
                    Put p = new Put(res.getRow());
                    p.addColumn(cfBytes, cqBytes, newvalBytes);
                    putList.add(p);
                }
            }
        } finally {
            rs.close();
        }
        table.put(putList);
        table.close();
    }

    public static ResultScanner scan(String tableName, String columnFamily, String columnQualifier) throws IOException {
        Table table = conn.getTable(TableName.valueOf(tableName));
        return  table.getScanner(new Scan().addColumn(Bytes.toBytes(columnFamily), Bytes.toBytes(columnQualifier)));
    }
}

相关问题