spark流媒体广播变量-case类

bjp0bcyl  于 2021-06-10  发布在  Hbase
关注(0)|答案(0)|浏览(235)

我的要求是使用hbase表中的概要信息来丰富数据流数据。我想用广播变量。在这里附上全部代码。
hbase数据的输出如下
在驱动程序节点hbasereaderbuilder中

(org.apache.spark.SparkContext@3c58b102,hbase_customer_profile,Some(data),WrappedArray(gender, age),None,None,List()))

在worker节点中

HBaseReaderBuilder(null,hbase_customer_profile,Some(data),WrappedArray(gender, age),None,None,List()))

正如你所见,它已经失去了Spark的背景。当我发表声明时

myRdd = bcdocRdd.map(r => Profile(r._1, r._2, r._3)) i get a NullPointerException

java.lang.NullPointerException
        at it.nerdammer.spark.hbase.HBaseReaderBuilderConversions$class.toSimpleHBaseRDD(HBaseReaderBuilder.scala:83)
        at it.nerdammer.spark.hbase.package$.toSimpleHBaseRDD(package.scala:5)
        at it.nerdammer.spark.hbase.HBaseReaderBuilderConversions$class.toHBaseRDD(HBaseReaderBuilder.scala:67)
        at it.nerdammer.spark.hbase.package$.toHBaseRDD(package.scala:5)
        at testPartition$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.apply(testPartition.scala:34)
        at testPartition$$anonfun$main$1$$anonfun$apply$1$$anonfun$apply$2.apply(testPartition.scala:33)

object testPartition {

def main(args: Array[String]) : Unit = {

val sparkMaster     = "spark://x.x.x.x:7077"
val ipaddress       = "x.x.x.x:2181" // Zookeeper
val hadoopHome      = "/home/hadoop/software/hadoop-2.6.0"
val topicname       = "new_events_test_topic"

val mainConf = new SparkConf().setMaster(sparkMaster).setAppName("testingPartition")

val mainSparkContext = new SparkContext(mainConf)

val ssc             = new StreamingContext(mainSparkContext, Seconds(30))
val eventsStream    = KafkaUtils.createStream(ssc,"x.x.x.x:2181","receive_rest_events",Map(topicname.toString -> 2))
val docRdd           = mainSparkContext.hbaseTable[(String, Option[String], Option[String])]("hbase_customer_profile").select("gender","age").inColumnFamily("data")
println ("docRDD from Driver ",docRdd)
val broadcastedprof     = mainSparkContext.broadcast(docRdd)

eventsStream.foreachRDD(dstream => {
  dstream.foreachPartition(records => {
    println("Broadcasted docRDD - in Worker ", broadcastedprof.value)
    val bcdocRdd  = broadcastedprof.value
    records.foreach(record => {
      //val myRdd = bcdocRdd.map(r => Profile(r._1, r._2, r._3))
      //myRdd.foreach(println)
      val Rows = record._2.split("\r\n")
    })
  })
})
ssc.start()
ssc.awaitTermination()

} }

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题