array[byte]spark rdd到string spark rdd

ezykj2lf  于 2021-06-09  发布在  Hbase
关注(0)|答案(2)|浏览(418)

我使用cloudera的sparkonhbase模块从hbase获取数据。
我得到的rdd是这样的:

var getRdd = hbaseContext.hbaseRDD("kbdp:detalle_feedback", scan)

基于此,我得到的是一个类型的对象

RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])]

它对应于行键和值列表。它们都由字节数组表示。
如果我将getrdd保存到一个文件中,我看到的是:

([B@f7e2590,[([B@22d418e2,[B@12adaf4b,[B@48cf6e81), ([B@2a5ffc7f,[B@3ba0b95,[B@2b4e651c), ([B@27d0277a,[B@52cfcf01,[B@491f7520), ([B@3042ad61,[B@6984d407,[B@f7c4db0), ([B@29d065c1,[B@30c87759,[B@39138d14), ([B@32933952,[B@5f98506e,[B@8c896ca), ([B@2923ac47,[B@65037e6a,[B@486094f5), ([B@3cd385f2,[B@62fef210,[B@4fc62b36), ([B@5b3f0f24,[B@8fb3349,[B@23e4023a), ([B@4e4e403e,[B@735bce9b,[B@10595d48), ([B@5afb2a5a,[B@1f99a960,[B@213eedd5), ([B@2a704c00,[B@328da9c4,[B@72849cc9), ([B@60518adb,[B@9736144,[B@75f6bc34)])

对于每条记录(行键和列)
但我需要的是得到所有键和值的字符串表示。或者至少是价值观。为了把它保存到一个文件中

key1,(value1,value2...)

或者类似的

key1,value1,value2...

我对spark和scala完全陌生,很难找到什么。
你能帮我一下吗?

7vhp5slm

7vhp5slm1#

首先让我们创建一些示例数据:

scala> val d = List( ("ab" -> List(("qw", "er", "ty")) ), ("cd" -> List(("ac", "bn", "afad")) ) )
d: List[(String, List[(String, String, String)])] = List((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))

数据是这样的:

scala> d foreach println
(ab,List((qw,er,ty)))
(cd,List((ac,bn,afad)))

将其转换为 Array[Byte] 格式

scala> val arrData = d.map { case (k,v) => k.getBytes() -> v.map { case (a,b,c) => (a.getBytes(), b.getBytes(), c.getBytes()) } }

arrData: List[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = List((Array(97, 98),List((Array(113, 119),Array(101, 114),Array(116, 121)))), (Array(99, 100),List((Array(97, 99),Array(98, 110),Array(97, 102, 97, 100)))))

使用此数据创建rdd

scala> val rdd1 = sc.parallelize(arrData)
rdd1: org.apache.spark.rdd.RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = ParallelCollectionRDD[0] at parallelize at <console>:25

创建转换函数 Array[Byte]String :

scala> def b2s(a: Array[Byte]): String = new String(a)
b2s: (a: Array[Byte])String

执行最终转换:

scala> val rdd2 = rdd1.map { case (k,v) => b2s(k) -> v.map{ case (a,b,c) => (b2s(a), b2s(b), b2s(c)) } }
rdd2: org.apache.spark.rdd.RDD[(String, List[(String, String, String)])] = MapPartitionsRDD[1] at map at <console>:29

scala> rdd2.collect()
res2: Array[(String, List[(String, String, String)])] = Array((ab,List((qw,er,ty))), (cd,List((ac,bn,afad))))
t8e9dugd

t8e9dugd2#

我不知道hbase,但如果 Array[Byte] 是unicode字符串,类似这样的操作应该可以:

rdd: RDD[(Array[Byte], List[(Array[Byte], Array[Byte], Array[Byte])])] = *whatever*
rdd.map(k, l => 
  (new String(k),
  l.map(a => 
    a.map(elem =>
      new String(elem)
    )
  ))
)

抱歉造型不好之类的,我甚至不确定它能不能用。

相关问题