spark流作业中任务之间的长时间一致等待

mklgxw1f  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(485)

我在mesos上有个spark流媒体工作。所有批次的时间都完全相同,而且这个时间比预期的要长得多。这些作业从Kafka中提取数据,处理数据并将其插入Cassandra,然后再将其返回到Kafka的另一个主题中。
每批(如下)有3个作业,其中2个从Kafka拉出,加工并插入Cassandra,另一个从Kafka拉出,加工并推回Kafka。
我检查了spark ui中的批处理,发现它们都需要相同的时间(4s),但向下钻取的时间更多,它们实际上每个处理不到1秒,但它们都有相同时间的间隔(大约4秒)。增加更多的执行者或者更多的处理能力看起来并没有什么不同。 Details of batch: Processing time = 12s & total delay = 1.2 s ??
因此,我深入研究了批处理的每个作业(它们都采用完全相同的时间=4s,即使它们正在进行不同的处理):



他们都用4秒钟的时间来运行他们的一个舞台(读Kafka的那个)。现在我深入到其中一个阶段(它们都非常相似):

为什么要等?整个过程实际上只需要0.5秒,它只是在等待。它在等Kafka吗?
有没有人经历过类似的事情?我可能编码错误或配置不正确?
编辑:
下面是触发此行为的最小代码。这让我觉得这一定是某种设置。

object Test {

  def main(args: Array[String]) {

    val sparkConf = new SparkConf(true)
    val streamingContext = new StreamingContext(sparkConf, Seconds(5))

    val kafkaParams = Map[String, String](
      "bootstrap.servers" -> "####,####,####",
      "group.id" -> "test"
    )

    val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
      streamingContext, kafkaParams, Set("test_topic")
    )

    stream.map(t => "LEN=" + t._2.length).print()

    streamingContext.start()
    streamingContext.awaitTermination()
  }
}

即使所有的执行者都在同一个节点上( spark.executor.cores=2 spark.cores.max=2 ),问题仍然存在,而且它与以前一样正好是4秒:一个mesos执行器
即使主题没有消息(一批0条记录),spark streaming每批也需要4秒。
我唯一能解决这个问题的方法就是设置 cores=1 以及 cores.max=1 所以它只创建一个要执行的任务。
此任务具有局部性 NODE_LOCAL . 看来什么时候 NODE_LOCAL 执行是瞬时的,但当局部性 ANY 连接Kafka需要4秒钟。所有机器都在同一个10gb网络中。你知道为什么会这样吗?

mi7gmzs6

mi7gmzs61#

问题是spark.locality.wait,这个链接给了我这个想法
它的默认值是3秒,对于spark streaming中处理的每个批处理,它占用了整个时间。
我已设置为0秒时,提交与mesos的工作( --conf spark.locality.wait=0 )现在一切都按预期运行。

相关问题