读取spark rdd foreach操作中的hbase时出错

eiee3dmh  于 2021-06-09  发布在  Hbase
关注(0)|答案(0)|浏览(249)

我试图使用spark streaming来使用kafka消息队列,在foreachrdd操作中,我试图根据从kafka取出的消息读取rdd foreach操作中的hbase。但是我犯了一些错误,看起来有些死锁。有人能帮我找出问题所在吗?下面是代码详细信息。

KafkaUtils.createStream(streamingContext,kafkazk,kafkaGroup,kafkaTopic.split(",").map(_.trim -> 1).toMap)
.foreachRDD(_.processRDD(new CQRequestContext(null,new DBRequestContext(prefix,dbName,tableName,null,null,null))))
streamingContext.start()
streamingContext.awaitTermination()

我试图在每个rdd foreach操作中读取hbase,我试图在foreachpartition操作中创建hbase连接,以减少连接数。

def processRDD(cQRequestContext: CQRequestContext) = {
     rdd.foreachPartition(iterator=>{
          val connection = ConnectionFactory.createConnection(HbaseUtil.getHbaseConfiguration("FdsCQ"));
          val hTable= connection.getTable(TableName.valueOf("cqdev:sampleTestTable"))

      try {
      iterator.foreach(requestTuple =>{
        //processAIPRequest(requestTuple._2,cQRequestContext,myTable)

          val p = new Put(Bytes.toBytes("dabao12345"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_CONTENT), Bytes.toBytes("content"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.PACKAGE_NOS), Bytes.toBytes("response"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.ABUSE_PACKAGES_NO), Bytes.toBytes("response"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_CODE), Bytes.toBytes("responseCode"))
          p.addColumn(Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.COLUMN_FAMILY), Bytes.toBytes(SellerConstants.CQ_ABUSE_RESULT.RESPONSE_STATUS), Bytes.toBytes("respone"))
          hTable.put(p)

      })

      } finally {
        //scanner.close
        hTable.close()
        connection.close()
      }

    })

}
最后,我得到下面的错误,调试后,我发现这不是像运行从hbase连接的东西,在测试期间,我只把一个消息到kafka,当我第一次尝试创建hbase连接,我得到下面的错误。

java.lang.Object.wait(Native Method)
java.lang.Object.wait(Object.java:503)
org.apache.zookeeper.ClientCnxn.submitRequest(ClientCnxn.java:1342)
org.apache.zookeeper.ZooKeeper.exists(ZooKeeper.java:1036)
org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.exists(RecoverableZooKeeper.java:222)
org.apache.hadoop.hbase.zookeeper.ZKUtil.checkExists(ZKUtil.java:481)
org.apache.hadoop.hbase.zookeeper.ZKClusterId.readClusterIdZNode(ZKClusterId.java:65)
org.apache.hadoop.hbase.client.ZooKeeperRegistry.getClusterId(ZooKeeperRegistry.java:86)
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.retrieveClusterId(ConnectionManager.java:849)
org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.<init>(ConnectionManager.java:670)
sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
java.lang.reflect.Constructor.newInstance(Constructor.java:526)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:238)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:218)
org.apache.hadoop.hbase.client.ConnectionFactory.createConnection(ConnectionFactory.java:119)
com.XXXXXXXX.func.CQStreamingFunction$$anonfun$processRDD$1.apply(CQStreamingFunction.scala:50)
com.XXXXXXXX.func.CQStreamingFunction$$anonfun$processRDD$1.apply(CQStreamingFunction.scala:49)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:898)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1850)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
org.apache.spark.scheduler.Task.run(Task.scala:88)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

此外,上述错误只会发生,当我尝试在Yarn集群中以Yarn集群模式运行spark streaming作业时,当我在本地机器中以本地模式运行并连接到远程hbase时,一切正常。
环境细节:cdh 5.5.2和spark 1.5.0

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题