db写入是通过spark流延迟执行的

osh3o9ms  于 2021-06-09  发布在  Hbase
关注(0)|答案(1)|浏览(375)

当spark流正在运行时,hbase PUT不会执行,只有在我关闭spark时才会执行—它会尝试同时执行所有PUT

val inputRdd = FlumeUtils.createStream(ssc, "server", 44444)
  inputRdd.foreachRDD({ rdd =>
    rdd.foreachPartition(partitionOfRecords => {
      val hbaseClient = new HBaseClient(zookeeper)
      partitionOfRecords.foreach({ event =>
         hbaseClient.put(parse(event))
         hbaseClient.flush()
643ylb08

643ylb081#

好的-我找到了答案-显然我的代码是正确的,问题是我没有留下足够的线程来处理数据
从http://spark.apache.org/docs/latest/streaming-programming-guide.html “”“如果您使用的是基于接收器的输入数据流(例如套接字、Kafka、flume等),那么将使用单个线程来运行接收器,而不留下任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为主url,其中n>要运行的接收器数(有关如何设置主url的信息,请参阅spark属性)
使用local[*]修复了这个问题

相关问题