使用spark访问hbase时,需要在hbaseconfiguration中指定正确的扫描范围,然后使用hbaseconfiguration创建rdd。好像是这样的:
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, inputTable)
conf.set(TableInputFormat.SCAN_ROW_START, start_row_string)
conf.set(TableInputFormat.SCAN_ROW_STOP, end_row_string)
val hBaseRDD = sc.newAPIHadoopRDD(
conf,
classOf[TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result]
)
在那之后我可以用任何方式操纵hbaserdd。但代码中的起始行字符串必须是字符串,这会导致问题。在hbase中,我的行键是由以int开头的字节数组创建的。也就是说:
val row_key = byte array of Int ++ arbitrary byte array
在我创建了row_key byte数组并将row key转换成一个字符串以传递到上面提到的hbaseconfiguration之后,我发现我错了:
val row_key_string = new String(row_key)
因为row\u key\u string.getbytes与row\u key.getbytes不相等,所以hbase没有获得正确的起始行密钥,并且hbase返回了错误的数据。例子:
val arr = Array(0,0,15,-77) //which is the array byte of 4018
val str = new String(arr)
arr.getBytes //return Array(0, 0, 15, -17, -65, -67)
arr.getBytes("UTF-16BE") //return Array(0,0,15,-77)
arr.getbytes(“utf-16be”)返回正确答案。由于方法getbytes由spark调用,因此我无法指定getbytes的字符集。
如果我不能解决这个问题。我必须放弃新的hadooprd。我可以在每个执行器中建立一个连接,并使用scan,它采用字节数组来指定hbase客户端提供的起始行键。但是它很难看。
1条答案
按热度按时间6mzjoqzu1#
我已经用tableinputformat.scan解决了我的问题。它是base64字符串。将任意字节数组转换为字符串是错误的,因为它的行为不受控制。