我在mesos上有个spark流媒体工作。所有批次的时间都完全相同,而且这个时间比预期的要长得多。这些作业从Kafka中提取数据,处理数据并将其插入Cassandra,然后再将其返回到Kafka的另一个主题中。
每批(如下)有3个作业,其中2个从Kafka拉出,加工并插入Cassandra,另一个从Kafka拉出,加工并推回Kafka。
我检查了spark ui中的批处理,发现它们都需要相同的时间(4s),但向下钻取的时间更多,它们实际上每个处理不到1秒,但它们都有相同时间的间隔(大约4秒)。增加更多的执行者或者更多的处理能力看起来并没有什么不同。 Details of batch: Processing time = 12s & total delay = 1.2 s
??
因此,我深入研究了批处理的每个作业(它们都采用完全相同的时间=4s,即使它们正在进行不同的处理):
他们都用4秒钟的时间来运行他们的一个舞台(读Kafka的那个)。现在我深入到其中一个阶段(它们都非常相似):
为什么要等?整个过程实际上只需要0.5秒,它只是在等待。它在等Kafka吗?
有没有人经历过类似的事情?我可能编码错误或配置不正确?
编辑:
下面是触发此行为的最小代码。这让我觉得这一定是某种设置。
object Test {
def main(args: Array[String]) {
val sparkConf = new SparkConf(true)
val streamingContext = new StreamingContext(sparkConf, Seconds(5))
val kafkaParams = Map[String, String](
"bootstrap.servers" -> "####,####,####",
"group.id" -> "test"
)
val stream = KafkaUtils.createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](
streamingContext, kafkaParams, Set("test_topic")
)
stream.map(t => "LEN=" + t._2.length).print()
streamingContext.start()
streamingContext.awaitTermination()
}
}
即使所有的执行者都在同一个节点上( spark.executor.cores=2 spark.cores.max=2
),问题仍然存在,而且它与以前一样正好是4秒:一个mesos执行器
即使主题没有消息(一批0条记录),spark streaming每批也需要4秒。
我唯一能解决这个问题的方法就是设置 cores=1
以及 cores.max=1
所以它只创建一个要执行的任务。
此任务具有局部性 NODE_LOCAL
. 看来什么时候 NODE_LOCAL
执行是瞬时的,但当局部性 ANY
连接Kafka需要4秒钟。所有机器都在同一个10gb网络中。你知道为什么会这样吗?
1条答案
按热度按时间mi7gmzs61#
问题是spark.locality.wait,这个链接给了我这个想法
它的默认值是3秒,对于spark streaming中处理的每个批处理,它占用了整个时间。
我已设置为0秒时,提交与mesos的工作(
--conf spark.locality.wait=0
)现在一切都按预期运行。