我定义了一个从redis读取数据的接收器。
部分接收器简化代码:
class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
override def onStart() = {
while(!isStopped) {
val res = readMethod()
if (res != null) store(res.toIterator)
// using res.foreach(r => store(r)) the performance is almost the same
}
}
}
我的流式处理工作流:
val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r =>
r.persist()
if (!r.isEmpty) {
some short operations about 1s in total
// note this line ######1
}
}
我有一个生产商,它的生产速度比消费者快得多,所以现在redis中有很多记录,我测试了数字10000。我进行了调试,所有记录在redis中之后都可以很快被读取 readMethod()
上面。然而,在每个微博客里我只能得到30张记录(如果商店足够快的话,它应该能拿到全部10000英镑)
对于这个嫌疑犯,我加了一个睡眠10秒的密码 Thread.sleep(10000)
至 ######1
上面。每一个微博客仍然会获得30条记录,每个微博客的处理时间增加10秒。如果我把持续时间增加到200毫秒, val ssc = new StreamingContext(spark.sparkContext, new Duration(200))
,它可以得到大约120张唱片。
所有这些都表明spark流只能在 Duration
? 获取rdd后,在主工作流中, store
方法暂时停止?但如果这是真的,那就太浪费了。我希望它也生成rdd(商店)的同时,主工作流正在运行。
有什么想法吗?
1条答案
按热度按时间zhte4eai1#
我不能仅仅因为没有足够的声誉就发表评论。有没有可能
spark.streaming.receiver.maxRate
是在你的代码里设置的吗?