基于hbase行的一部分创建rdd

zazmityj  于 2021-06-02  发布在  Hadoop
关注(0)|答案(1)|浏览(307)

我在努力创造 RDD 基于 HBase 表格:

val targetRDD = sparkContext.newAPIHadoopRDD(hBaseConfig,
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result])
  .map {
    case (key, row) => parse(key, row)
  }
``` `parse` 为每个表行调用,而不考虑对数据的进一步操作。
是否可以仅检索具有与某些条件匹配的特定键(即,键在某个特定范围内)的行,以便仅对其进行操作?
yftpprvb

yftpprvb1#

hbase是一个键/值存储,其中的行按键排序,这意味着:
它可以有效地按键检索单行或按键范围检索行序列
在某些条件下检索随机行是无效的
所有检索操作归结为两个类:get和scan。这不难猜测它们是做什么的,除非指定stoprow/startrow,否则scan将遍历所有行。您也可以在扫描时设置过滤器,但它仍然需要迭代所有行,过滤器只是可以降低网络压力,因为hbase可能需要返回更少的行。
示例中的tableinputformat使用其内部的扫描来访问hbase:

public void setConf(Configuration configuration) {
    this.conf = configuration;

    Scan scan = null;

    if (conf.get(SCAN) != null) {
      try {
        scan = TableMapReduceUtil.convertStringToScan(conf.get(SCAN));
      } catch (IOException e) {
        LOG.error("An error occurred.", e);
      }
    } else {
      try {
        scan = createScanFromConfiguration(conf);
      } catch (Exception e) {
          LOG.error(StringUtils.stringifyException(e));
      }
    }

    setScan(scan);
  }

此外,tableinputformat内部的createscanfromconfiguration方法可以提示您如何设置筛选器和键范围:

public static Scan createScanFromConfiguration(Configuration conf) throws IOException {
    Scan scan = new Scan();

    if (conf.get(SCAN_ROW_START) != null) {
      scan.setStartRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_START)));
    }

    if (conf.get(SCAN_ROW_STOP) != null) {
      scan.setStopRow(Bytes.toBytesBinary(conf.get(SCAN_ROW_STOP)));
    }

    if (conf.get(SCAN_COLUMNS) != null) {
      addColumns(scan, conf.get(SCAN_COLUMNS));
    }

    if (conf.get(SCAN_COLUMN_FAMILY) != null) {
      scan.addFamily(Bytes.toBytes(conf.get(SCAN_COLUMN_FAMILY)));
    }

    if (conf.get(SCAN_TIMESTAMP) != null) {
      scan.setTimeStamp(Long.parseLong(conf.get(SCAN_TIMESTAMP)));
    }

    if (conf.get(SCAN_TIMERANGE_START) != null && conf.get(SCAN_TIMERANGE_END) != null) {
      scan.setTimeRange(
          Long.parseLong(conf.get(SCAN_TIMERANGE_START)),
          Long.parseLong(conf.get(SCAN_TIMERANGE_END)));
    }

    if (conf.get(SCAN_MAXVERSIONS) != null) {
      scan.setMaxVersions(Integer.parseInt(conf.get(SCAN_MAXVERSIONS)));
    }

    if (conf.get(SCAN_CACHEDROWS) != null) {
      scan.setCaching(Integer.parseInt(conf.get(SCAN_CACHEDROWS)));
    }

    if (conf.get(SCAN_BATCHSIZE) != null) {
      scan.setBatch(Integer.parseInt(conf.get(SCAN_BATCHSIZE)));
    }

    // false by default, full table scans generate too much BC churn
    scan.setCacheBlocks((conf.getBoolean(SCAN_CACHEBLOCKS, false)));

    return scan;
  }

这个stackoverflow答案提供了一个如何设置扫描的例子 hbaseConfig 注意,尽管您不必设置scan,但您可以只设置特定的属性,如scan\u row\u start和other from createScanFromConfiguration 我在上面提到过。

相关问题