我必须消费来自Kafka主题的tweet,并将其输入hbase。下面是我写的代码,但它不能正常工作。
主代码没有调用“convert”方法,因此没有记录被摄取到hbase表中。有人能帮帮我吗。
tweetskafkaStream.foreachRDD(rdd => {
println("Inside For Each RDD" )
rdd.foreachPartition( record => {
println("Inside For Each Partition" )
val data = record.map(r => (r._1, r._2)).map(convert)
})
})
def convert(t: (String, String)) = {
println("in convert")
//println("first param value ", t._1)
//println("second param value ", t._2)
val hConf = HBaseConfiguration.create()
hConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTableName)
hConf.set("hbase.zookeeper.quorum", "192.168.XXX.XXX:2181")
hConf.set("hbase.master", "192.168.XXX.XXX:16000")
hConf.set("hbase.rootdir","hdfs://192.168.XXX.XXX:9000/hbase")
val today = Calendar.getInstance.getTime
val printformat = new SimpleDateFormat("yyyyMMddHHmmss")
val id = printformat.format(today)
val p = new Put(Bytes.toBytes(id))
p.add(Bytes.toBytes("data"), Bytes.toBytes("tweet_text"),(t._2).getBytes())
(id, p)
val mytable = new HTable(hConf,hbaseTableName)
mytable.put(p)
}
我不想使用当前日期时间作为键(t.),因此在convert方法中构造它。
谢谢
巴拉
1条答案
按热度按时间fjaof16o1#
我把它改成了foreach,而不是foreachpartition。这很有效。