spark流将偏移量抛出超出范围,没有重新启动流的选项

lb3vh1jj  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(334)

我在spark 2.1.1上运行一个流媒体作业,轮询kafka 0.10。我正在使用spark kafkautils类创建一个dstream,在我拥有由于保留策略而过时的数据之前,一切都正常工作。我的问题来了,当我停止我的工作,作出一些改变,如果任何数据已老化的主题,我会得到一个错误,说我的偏移量超出范围。我做了很多研究,包括查看spark源代码,我看到了很多评论,比如本期的评论:spark-19680——基本上是说数据不应该自动丢失——所以auto.offset.reset被spark忽略了。不过,我最大的问题是,我现在能做什么?我的主题不会在spark中进行投票-它在启动时会消亡,只有一个例外。我不知道如何重置补偿,这样我的工作就可以重新开始了。我没有启用检查点,因为我读到这些是不可靠的这种使用。我以前有很多代码来管理偏移量,但是如果有任何提交的偏移量,spark会忽略请求的偏移量,所以我现在管理的偏移量如下:

val stream = KafkaUtils.createDirectStream[String, T](
    ssc,
    PreferConsistent,
    Subscribe[String, T](topics, kafkaParams))

stream.foreachRDD { (rdd, batchTime) =>
    val offsets = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    Log.debug("processing new batch...")

    val values = rdd.map(x => x.value())
    val incomingFrame: Dataset[T] = SparkUtils.sparkSession.createDataset(values)(consumer.encoder()).persist

    consumer.processDataset(incomingFrame, batchTime)
    stream.asInstanceOf[CanCommitOffsets].commitAsync(offsets)
}
ssc.start()
ssc.awaitTermination()

作为一个解决办法,我一直在改变我的组ID,但这是真的跛脚。我知道这是预期的行为,不应该发生,我只需要知道如何让流再次运行。任何帮助都将不胜感激。

6qfn3psc

6qfn3psc1#

尝试

auto.offset.reset=latest

或者

auto.offset.reset=earliest

最早:自动将偏移量重置为最早偏移量
最新:自动将偏移重置为最新偏移
无:如果没有为使用者的组找到以前的偏移量,则向使用者抛出异常
其他:向消费者抛出异常。
影响最小和最大配置的偏移量值的另一个因素是日志保留策略。假设您有一个保留时间配置为1小时的主题。你发了10条信息,一小时后你又发了10条。最大的偏移量仍将保持不变,但最小的偏移量不能为0,因为kafka已经删除了这些消息,因此最小的可用偏移量将为10。

p8h8hvxi

p8h8hvxi2#

这是我写的一段代码,在一个真正的解决方案被引入spark流Kafka之前,我一直都是这样做的。它基本上是根据您设置的offsetresetstrategy重置过期分区的偏移量。只要给它同样的Map参数,你提供给Kafka提尔。在从驱动程序调用kafkautils.create****stream()之前调用此函数。

final OffsetResetStrategy offsetResetStrategy = OffsetResetStrategy.valueOf(_params.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG).toString().toUpperCase(Locale.ROOT));
if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy) || OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
    LOG.info("Going to reset consumer offsets");
    final KafkaConsumer<K,V> consumer = new KafkaConsumer<>(_params);

    LOG.debug("Fetching current state");
    final List<TopicPartition> parts = new LinkedList<>();
    final Map<TopicPartition, OffsetAndMetadata> currentCommited = new HashMap<>();
    for(String topic: this.topics()) {
        List<PartitionInfo> info = consumer.partitionsFor(topic);
        for(PartitionInfo i: info) {
            final TopicPartition p = new TopicPartition(topic, i.partition());
            final OffsetAndMetadata m = consumer.committed(p);
            parts.add(p);
            currentCommited.put(p, m);
        }
    }
    final Map<TopicPartition, Long> begining = consumer.beginningOffsets(parts);
    final Map<TopicPartition, Long> ending = consumer.endOffsets(parts);

    LOG.debug("Finding what offsets need to be adjusted");
    final Map<TopicPartition, OffsetAndMetadata> newCommit = new HashMap<>();
    for(TopicPartition part: parts) {
        final OffsetAndMetadata m = currentCommited.get(part);
        final Long begin = begining.get(part);
        final Long end = ending.get(part);

        if(m == null || m.offset() < begin) {
            LOG.info("Adjusting partition {}-{}; OffsetAndMeta={} Begining={} End={}", part.topic(), part.partition(), m, begin, end);

            final OffsetAndMetadata newMeta;
            if(OffsetResetStrategy.EARLIEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(begin);
            } else if(OffsetResetStrategy.LATEST.equals(offsetResetStrategy)) {
                newMeta = new OffsetAndMetadata(end);
            } else {
                newMeta = null;
            }

            LOG.info("New offset to be {}", newMeta);
            if(newMeta != null) {
                newCommit.put(part, newMeta);
            }
        }

    }
    consumer.commitSync(newCommit);
    consumer.close();
}
yhqotfr8

yhqotfr83#

auto.offset.reset=latest/earliest 将仅在使用者首次启动时应用。
有星星之火吉拉来解决这个问题,直到那时,我们需要生活与工作周围。https://issues.apache.org/jira/browse/spark-19680

相关问题