当spark流正在运行时,hbase PUT不会执行,只有在我关闭spark时才会执行—它会尝试同时执行所有PUT
val inputRdd = FlumeUtils.createStream(ssc, "server", 44444)
inputRdd.foreachRDD({ rdd =>
rdd.foreachPartition(partitionOfRecords => {
val hbaseClient = new HBaseClient(zookeeper)
partitionOfRecords.foreach({ event =>
hbaseClient.put(parse(event))
hbaseClient.flush()
1条答案
按热度按时间643ylb081#
好的-我找到了答案-显然我的代码是正确的,问题是我没有留下足够的线程来处理数据
从http://spark.apache.org/docs/latest/streaming-programming-guide.html “”“如果您使用的是基于接收器的输入数据流(例如套接字、Kafka、flume等),那么将使用单个线程来运行接收器,而不留下任何线程来处理接收到的数据。因此,在本地运行时,始终使用“local[n]”作为主url,其中n>要运行的接收器数(有关如何设置主url的信息,请参阅spark属性)
使用local[*]修复了这个问题