使用spark在hbase中存储数据

7fyelxc5  于 2021-06-10  发布在  Hbase
关注(0)|答案(1)|浏览(428)

我正在尝试使用以下代码在hbase中存储数据:

val x = Seq((1, ("a", "A")), (1, ("a", "AA")), (2, ("d", "D")), (2, ("d", "DD")))
val f = sc.parallelize(x)
val z = f.groupByKey()

z.collectAsMap().foreach(elem => {
    var rowKey = elem._1.toString()
    var p = new Put(rowKey.getBytes())

    elem._2.foreach(innerElem => {
        var col = innerElem._1
        var value = innerElem._2
        p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
        table.put(p)
    })
})
table.flushCommits()

我从hbase shell获得以下输出:

ROW        COLUMN+CELL                                                                                   
 1         column=cf:a, timestamp=1487917201238, value=AA                                                                                                 
 2         column=cf:d, timestamp=1487917201226, value=DD

但我想得到:

ROW        COLUMN+CELL 
 1         column=cf:a, timestamp=1487917201238, value=A                                                                                  
 1         column=cf:a, timestamp=148791720123X, value=AA                                                
 2         column=cf:d, timestamp=1487917201226, value=D                                                 
 2         column=cf:d, timestamp=148791720122X, value=DD

我的代码重写了特定列的第一个值,并只存储该列的最后一个值。我要存储特定列的每个值。
当我搬家的时候 var p = new Put(rowKey.getBytes()) 在scala shell中的第二个foreach循环中,我得到了以下内容:

scala>     
     |  z.collectAsMap().foreach(elem => {
     |       val rowKey = elem._1.toString()
     |        
     |       elem._2.foreach(innerElem => {
     | Display all 620 possibilities? (y or n)
     | ew Put(rowKey.getBytes())
     |         var col = innerElem._1
     |         var value = innerElem._2
     |         p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
     |         table.put(p)
     |       })
     |     })
<console>:49: error: not found: value ew
       ew Put(rowKey.getBytes())
       ^
<console>:52: error: not found: value p
               p.add("cf".getBytes(), new String(col).getBytes(), new String(value).getBytes())
               ^
<console>:53: error: not found: value p
               table.put(p)
                         ^
f8rj6qna

f8rj6qna1#

你的问题在于 Put 对象。把它看作一个哈希表。只需重写第二个 forloop 迭代。创建单独的 Put 每次迭代中的对象,它会很好。

相关问题