我正在尝试通过pyspark从hbase读取和过滤数据。到目前为止,能够做扫描,范围扫描(使用开始和停止)。然而,对如何使用过滤器一无所知。e、 g.值过滤器、列预过滤器等。。
假设使用hbase.filter.filterbase.filterrowkey类
在谷歌搜索时,发现一些类似的问题之前被问到,但没有得到回答。spark:如何使用hbase过滤器例如pythonapi的qualifierfilter
注意:使用cloudera发行版,需要通过pyspark完成(听说可以通过scala/java轻松完成)
以下是code:-
host = "host@xyz.com"
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"
conf_read = {"hbase.master.kerberos.principal": "hbase/_HOST@xyz.com", \
"hbase.rpc.protection":"privacy", \
"hadoop.security.authentication": "kerberos", \
"hadoop.rpc.protection": "privacy", \
"hbase.regionserver.kerberos.principal": "hbase/_HOST@xyz.com" , \
"hadoop.security.authentication": "kerberos", \
"hbase.security.authentication": "kerberos", \
"hbase.zookeeper.property.clientPort": "2181", \
"zookeeper.znode.parent": "/hbase", \
"hbase.zookeeper.quorum": host, \
"hbase.mapreduce.inputtable": "namespace:tablename", \
#"hbase.mapreduce.scan.row.start": "row2", \ --this works
#"hbase.mapreduce.scan.row.stop": "row4", \ --this works
"hbase.filter.FilterBase.filterRowKey":"row3"} --this does not
testdata_rdd = spark.sparkContext.newAPIHadoopRDD(
"org.apache.hadoop.hbase.mapreduce.TableInputFormat",
"org.apache.hadoop.hbase.io.ImmutableBytesWritable",
"org.apache.hadoop.hbase.client.Result",
keyConverter=keyConv,
valueConverter=valueConv,
conf=conf_read)
output = testdata_rdd.collect()
print output
暂无答案!
目前还没有任何答案,快来回答吧!