推特订阅-Kafka主题并吸收到hbase中

sdnqo3pr  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(261)

我必须消费来自Kafka主题的tweet,并将其输入hbase。下面是我写的代码,但它不能正常工作。
主代码没有调用“convert”方法,因此没有记录被摄取到hbase表中。有人能帮帮我吗。

tweetskafkaStream.foreachRDD(rdd => {
  println("Inside For Each RDD" )
  rdd.foreachPartition( record => {
    println("Inside For Each Partition" )
    val data = record.map(r => (r._1, r._2)).map(convert)
    })
  })

def convert(t: (String, String)) = {
    println("in convert")
    //println("first param value ", t._1)
    //println("second param value ", t._2)

  val hConf = HBaseConfiguration.create()
  hConf.set(TableOutputFormat.OUTPUT_TABLE,hbaseTableName)
  hConf.set("hbase.zookeeper.quorum", "192.168.XXX.XXX:2181")
  hConf.set("hbase.master", "192.168.XXX.XXX:16000")
  hConf.set("hbase.rootdir","hdfs://192.168.XXX.XXX:9000/hbase")
  val today = Calendar.getInstance.getTime
  val printformat =  new SimpleDateFormat("yyyyMMddHHmmss")

  val id =  printformat.format(today)
  val p = new Put(Bytes.toBytes(id))

  p.add(Bytes.toBytes("data"), Bytes.toBytes("tweet_text"),(t._2).getBytes())
  (id, p)

  val mytable = new HTable(hConf,hbaseTableName)
  mytable.put(p)
}

我不想使用当前日期时间作为键(t.),因此在convert方法中构造它。
谢谢
巴拉

fjaof16o

fjaof16o1#

我把它改成了foreach,而不是foreachpartition。这很有效。

相关问题