如何将hbase单元展平,以便使用spark rdd或scala中的Dataframe处理生成的json?

fjaof16o  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(242)

是spark、hbase和scala的新手。我在同一列族的hbase单元中有json(存储为字节数组),但它跨越了几千个列限定符。示例(简化):
表名:'事件'
行键:rk1列族:cf1
列限定符:cq1,单元数据(字节):{“id”:1,“event”:“standing”}
列限定符:cq2,单元数据(字节):{“id”:2,“event”:“siting”}
等。
使用scala,我可以通过指定时间范围来读取行

val scan = new Scan()
  val start = 1460542400
  val end = 1462801600
  val hbaseContext = new HBaseContext(sc, conf)
  val getRdd = hbaseContext.hbaseRDD(TableName.valueOf("Events"), scan)

如果我尝试将hbase rdd(getrdd)加载到Dataframe中(将字节数组转换为字符串等之后),它只读取每行的第一个单元格(在上面的示例中,我只会得到“standing”。
此代码只为返回的每一行加载一个单元格

val resultsString = getRdd.map(s=>Bytes.toString(s._2.value()))
  val resultsDf = sqlContext.read.json(resultsString)

为了得到每个单元格,我必须如下迭代。

val jsonRDD = getRdd.map(
    row => {
      val str = new StringBuilder
      str.append("[")
      val it = row._2.listCells().iterator()
      while (it.hasNext) {
        val cell = it.next()
        val cellstring = Bytes.toString(CellUtil.cloneValue(cell))
        str.append(cellstring)
        if (it.hasNext()) {
          str.append(",")
        }
      }
      str.append("]")
      str.toString()
    }
  )
  val hbaseDataSet = sqlContext.read.json(jsonRDD)

我需要添加方括号和逗号,以便正确格式化json,以便Dataframe读取它。
问题:
有没有一种更优雅的方法来构造json,也就是说,某个解析器接收各个json字符串并将它们连接在一起,这样它的json格式就正确了?
有没有更好的能力扁平化hbase细胞,所以我不需要迭代?
对于jsonrdd,计算的闭包应该包含str局部变量,因此在节点上执行此代码的任务不应该缺少“[”、“]”或“,”。i、 一旦我在集群上而不是在本地[*]上运行它,我就不会得到解析器错误
最后,仅仅从json创建一对rdd,还是使用Dataframe来执行计数之类的简单操作更好?有没有什么方法来衡量一个和另一个的效率和表现?
谢谢您

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题