我在spark 2.1.1上运行一个流媒体作业,轮询kafka 0.10。我正在使用spark kafkautils类创建一个dstream,在我拥有由于保留策略而过时的数据之前,一切都正常工作。我的问题来了,当我停止我的工作,作出一些改变,如果任何数据已老化的主题,我会得到一个错误,说我的偏移量超出范围。我做了很多研究,包括查看spark源代码,我看到了很多评论,比如本期的评论:spark-19680——基本上是说数据不应该自动丢失——所以auto.offset.reset被spark忽略了。不过,我最大的问题是,我现在能做什么?我的主题不会在spark中进行投票-它在启动时会消亡,只有一个例外。我不知道如何重置补偿,这样我的工作就可以重新开始了。我没有启用检查点,因为我读到这些是不可靠的这种使用。我以前有很多代码来管理偏移量,但是如果有任何提交的偏移量,spark会忽略请求的偏移量,所以我现在管理的偏移量如下:
val stream = KafkaUtils.createDirectStream[String, T](
ssc,
PreferConsistent,
Subscribe[String, T](topics, kafkaParams))
stream.foreachRDD { (rdd, batchTime) =>
val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
Log.debug("processing new batch...")
val values = rdd.map(x => x.value())
val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist
consumer.processDataset(incomingFrame, batchTime)
stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()
作为一个解决办法,我一直在改变我的组ID,但这是真的跛脚。我知道这是预期的行为,不应该发生,我只需要知道如何让流再次运行。任何帮助都将不胜感激。
3条答案
按热度按时间6qfn3psc1#
尝试
或者
最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常。
影响最小和最大配置的偏移量值的另一个因素是日志保留策略。假设您有一个保留时间配置为1小时的主题。你发了10条信息,一小时后你又发了10条。最大的偏移量仍将保持不变,但最小的偏移量不能为0,因为kafka已经删除了这些消息,因此最小的可用偏移量将为10。
p8h8hvxi2#
这是我写的一段代码,在一个真正的解决方案被引入spark流Kafka之前,我一直都是这样做的。它基本上是根据您设置的offsetresetstrategy重置过期分区的偏移量。只要给它同样的Map参数,你提供给Kafka提尔。在从驱动程序调用kafkautils.create****stream()之前调用此函数。
yhqotfr83#
auto.offset.reset=latest/earliest
将仅在使用者首次启动时应用。有星星之火吉拉来解决这个问题,直到那时,我们需要生活与工作周围。https://issues.apache.org/jira/browse/spark-19680