我是一个新的spark用户,我想将流数据保存到多个hbase表中。当我想把我的数据保存在一个文件中时,我没有遇到任何问题,但是我无法处理多个文件。
我曾尝试创建多个htable,但后来发现这个类只用于与单个hbase表通信。
有什么办法吗?
这就是我尝试创建多个htable的地方(当然不起作用,但这是我的想法)
//HBASE Tables
val tableFull = "table1"
val tableCategoricalFiltered = "table2"
// Add local HBase conf
val conf1 = HBaseConfiguration.create()
val conf2 = HBaseConfiguration.create()
conf1.set(TableInputFormat.INPUT_TABLE, tableFull)
conf2.set(TableInputFormat.INPUT_TABLE, tableCategoricalFiltered)
//Opening Tables
val tableInputFeatures = new HTable(conf1, tableFull)
val tableCategoricalFilteredFeatures = new HTable(conf2, tableCategoricalFiltered)
这里是我尝试使用它们的地方(尽管有一个htable工作)
events.foreachRDD { event =>
var j = 0
event.foreach { feature =>
if ( j <= 49 ) {
println("Feature " + j + " : " + featuresDic(j))
println(feature)
val p_full = new Put(new String("stream " + row_full).getBytes())
p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
tableInputFeatures.put(p_full)
if ( j != 26 || j != 27 || j != 28 || j != 29 ) {
val p_cat = new Put(new String("stream " + row_categorical).getBytes())
p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
tableCategoricalFilteredFeatures.put(p_cat)
}else{
j = 0
row_full = row_full + 1
println("Feature " + j + " : " + featuresDic(j))
println(feature)
val p_full = new Put(new String("stream " + row_full).getBytes())
p_full.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
tableInputFeatures.put(p_full)
val p_cat = new Put(new String("stream " + row_categorical).getBytes())
p_cat.add(featuresDic(j).getBytes(), "1".getBytes(), new String(feature).getBytes())
tableCategoricalFilteredFeatures.put(p_cat)
}
j = j + 1
}
}
1条答案
按热度按时间pw136qt21#
有一种方法我确认效果很好,使用hbase rdd库。https://github.com/unicredit/hbase-rdd
它很容易使用。请参考https://github.com/unicredit/hbase-rdd#writing-到hbase查看用法。
您可以尝试使用multitableoutputformat,因为我已经确认它可以很好地与传统的mapreduce配合使用。我还没用spark的。