我有个问题,我们用的是Kafka和斯帕克。
val ssc = new StreamingContext(conf, Seconds(10))
val messages = KafkaUtils.createDirectStream(ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[K, V](config.topics, scala.collection.Map[String, Object](kafkaParams.toSeq: _*), offsetRange)))
messages.foreachRDD {(rdd, time) => ...}
它运行良好,但有时新的批次在前一批次后大约10分钟后开始启动。时间由日志消息来衡量。
为什么会发生这种情况?
1条答案
按热度按时间t2a7ltrp1#
我已经找到原因了,是因为问题。apache.org/jia/BROWSE/kafka-12890