hbase与googlebigtable:扫描大量行

vcudknz3  于 2021-06-08  发布在  Hbase
关注(0)|答案(1)|浏览(482)

我正在尝试扫描bigtable上的起始行和结束行。扫描之间的元素约为100k。我想让他们成批,我可以在hbase与使用 setCaching(500) .
在bigtable,似乎 setCaching 被忽略,并尝试在1 rpc中获取整个结果集。如何实现与hbase类似的功能?
我正在使用java驱动程序 bigtable-hbase-1.1 和版本 1.0.0-pre3 bigtable配置:

Configuration conf = new Configuration();
conf.set("google.bigtable.buffered.mutator.throttling.enable", "false");
conf.set("google.bigtable.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.read.partial.row.timeout.ms","1500000");
conf.set("google.bigtable.long.rpc.timeout.ms", "1500000");
conf.set("google.bigtable.grpc.retry.deadlineexceeded.enable", "false");
conf.set("google.bigtable.buffered.mutator.max.inflight.rpcs", "500");
conf.set("google.bigtable.bulk.max.row.key.count", "500");

Configuration conff =  BigtableConfiguration.configure(conf,projectID,instanceID);
connection = BigtableConfiguration.connect(conff);

扫描仪配置:

byte[] start = "prefix".getbytes() ;
byte[] end =  Bytes.add("prefix".getbytes(),(byte))0xff);
Scan scan = new Scan(start, end);

输出的预期行数约为100k。

cyvaqqii

cyvaqqii1#

读取行时不必担心批处理。bigtable响应流化,并具有反压力意识。我们也依赖grpc来缓冲流的块。以下是grpc流媒体简介的链接:https://grpc.io/docs/guides/concepts.html#server-流式rpc
可以试试这个示例代码,让我知道它是否工作(即没有超过截止日期的错误)。如果示例代码有效,请修改它以扫描您自己的数据并确保它仍然有效。如果有什么问题,请告诉我。
pom.xml文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.google.cloud.example</groupId>
  <artifactId>row-write-read-example</artifactId>
  <version>1.0-SNAPSHOT</version>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.12</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>com.google.cloud.bigtable</groupId>
      <artifactId>bigtable-hbase-1.x</artifactId>
      <version>1.0.0-pre3</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <artifactId>maven-compiler-plugin</artifactId>
        <version>3.6.2</version>
        <configuration>
          <source>1.8</source>
          <target>1.8</target>
        </configuration>
      </plugin>
    </plugins>
  </build>
</project>

java 语:

import com.google.cloud.bigtable.hbase.BigtableConfiguration;
import java.io.IOException;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.BufferedMutator;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;

public class WriteReadTest {
  private static final String PROJECT_ID = "<YOUR_PROJECT_ID>";
  private static final String INSTANCE_ID = "<YOUR_INSTANCE_ID>";
  private static final String TABLE_ID = "<YOUR_NONEXISTENT_TABLE>";
  private static final String FAMILY = "cf";

  private static final TableName TABLE_NAME = TableName.valueOf(TABLE_ID);

  public static void main(String[] args) throws IOException {
    try(Connection connection = BigtableConfiguration.connect(PROJECT_ID, INSTANCE_ID);
        Admin admin = connection.getAdmin()) {

      // Setup
      admin.createTable(
          new HTableDescriptor(TABLE_NAME)
              .addFamily(new HColumnDescriptor(FAMILY))
      );

      try {
        // Write the rows
        populateTable(connection, 2_000_000);

        // Read the rows
        readFullTable(connection);
      } finally {
        admin.disableTable(TABLE_NAME);
        admin.deleteTable(TABLE_NAME);
      }

    }
  }

  private static void populateTable(Connection connection, int rowCount) throws IOException {
    long startTime = System.currentTimeMillis();
    int buckets = 100;
    int maxWidth = Integer.toString(buckets).length();

    try(BufferedMutator bufferedMutator = connection.getBufferedMutator(TABLE_NAME)) {
      for (int i = 0; i < rowCount; i++) {
        String prefix = String.format("%0" + maxWidth + "d", i % buckets);
        String key = prefix + "-" + String.format("%010d", i);
        String value = "value-" + key;

        Put put = new Put(key.getBytes())
            .addColumn(
                FAMILY.getBytes(),
                HConstants.EMPTY_BYTE_ARRAY,
                value.getBytes()
            );

        bufferedMutator.mutate(put);
      }
    }

    long endTime = System.currentTimeMillis();
    System.out.printf("Populated table in %d secs, writing %d rows\n", (endTime - startTime) / 1000, rowCount);
  }

  private static void readFullTable(Connection connection) throws IOException {
    long startTime = System.currentTimeMillis();

    int count = 0;
    try(Table table = connection.getTable(TABLE_NAME);
        ResultScanner scanner = table.getScanner(new Scan("0".getBytes(), "z".getBytes()))) {

      for(Result row = scanner.next(); row != null; row = scanner.next()) {
        count++;
      }
    }

    long endTime = System.currentTimeMillis();

    System.out.printf("Scanned table in %d secs, reading %d rows\n", (endTime - startTime) / 1000, count);
  }
}

相关问题