spark流媒体存储方法只能在持续时间窗口中工作,而不能在定制接收器的foreachrdd工作流中工作

qacovj5a  于 2021-06-09  发布在  Redis
关注(0)|答案(1)|浏览(276)

我定义了一个从redis读取数据的接收器。
部分接收器简化代码:

class MyReceiver extends Receiver (StorageLevel.MEMORY_ONLY){
  override def onStart() = {
    while(!isStopped) {
      val res = readMethod()
      if (res != null) store(res.toIterator) 
      // using res.foreach(r => store(r)) the performance is almost the same
    }
  }
}

我的流式处理工作流:

val ssc = new StreamingContext(spark.sparkContext, new Duration(50))
val myReceiver = new MyReceiver()
val s = ssc.receiverStream(myReceiver)
s.foreachRDD{ r => 
  r.persist()
  if (!r.isEmpty) {
    some short operations about 1s in total
    // note this line ######1
  }
}

我有一个生产商,它的生产速度比消费者快得多,所以现在redis中有很多记录,我测试了数字10000。我进行了调试,所有记录在redis中之后都可以很快被读取 readMethod() 上面。然而,在每个微博客里我只能得到30张记录(如果商店足够快的话,它应该能拿到全部10000英镑)
对于这个嫌疑犯,我加了一个睡眠10秒的密码 Thread.sleep(10000)######1 上面。每一个微博客仍然会获得30条记录,每个微博客的处理时间增加10秒。如果我把持续时间增加到200毫秒, val ssc = new StreamingContext(spark.sparkContext, new Duration(200)) ,它可以得到大约120张唱片。
所有这些都表明spark流只能在 Duration ? 获取rdd后,在主工作流中, store 方法暂时停止?但如果这是真的,那就太浪费了。我希望它也生成rdd(商店)的同时,主工作流正在运行。
有什么想法吗?

zhte4eai

zhte4eai1#

我不能仅仅因为没有足够的声誉就发表评论。有没有可能 spark.streaming.receiver.maxRate 是在你的代码里设置的吗?

相关问题