为什么我的kafka streams应用程序的使用者组(app id)的偏移量在应用程序重新启动后被重置?

tp5buhyn  于 2021-06-07  发布在  Kafka
关注(0)|答案(3)|浏览(498)

我有一个kafka streams应用程序,每当我重新启动它时,它所使用的主题的偏移量就会被重置。因此,对于所有分区,延迟都会增加,应用程序需要重新处理所有数据。
更新:输出主题正在接收应用程序重新启动后已处理的突发事件,而不是输入主题偏移量正在重置,正如我在上一段中所说的那样。但是,内部主题(ktable suppress state store)偏移正在重置,请参见下面的注解。
我已经确保在重启之前每个分区的延迟都是1(这是针对输出主题的)。属于该用户组id(应用程序id)的所有用户都处于活动状态。重启是立即的,大约需要30秒。
应用程序只使用一次作为处理保证。
我已经读过这个答案,ApacheKafka消费群体的补偿是如何过期的。
我尝试过auto.offset.reset=latest和auto.offset.reset=earlish。
似乎这些主题的补偿没有得到有效的落实(但我对此不确定)。
我假设在重启之后,应用程序应该从该消费群体的最新提交的偏移量中提取。
更新:我假设这是内部主题(ktable suppress state store)
kafka流api是否确保在关闭之前提交所有消耗的偏移量(调用streams.close()后)
我真的很想知道这方面的任何线索。
更新:
这是应用程序执行的代码:

final StreamsBuilder builder = new StreamsBuilder();
final KStream<..., ...> events = builder
        .stream(inputTopicNames, Consumed.with(..., ...)
        .withTimestampExtractor(...);

events
    .filter((k, v) -> ...)
    .flatMapValues(v -> ...)
    .flatMapValues(v -> ...)
    .selectKey((k, v) -> v)
    .groupByKey(Grouped.with(..., ...))
    .windowedBy(
        TimeWindows.of(Duration.ofSeconds(windowSizeInSecs))              
            .advanceBy(Duration.ofSeconds(windowSizeInSecs))
            .grace(Duration.ofSeconds(windowSizeGraceInSecs)))
    .reduce((agg, new) -> {
        ...
        return agg;
    })
    .suppress(Suppressed.untilWindowCloses(
                  Suppressed.BufferConfig.unbounded()))
    .toStream()
    .to(outPutTopicNameOfGroupedData, Produced.with(..., ...));

通过kafka流api创建的ktable suppress state store内部主题,偏移量重置总是发生(在重新启动之后)。
我已经试过一次,而且至少试过一次。
再说一次,我真的很感激你给我的任何线索。
更新:2.2.1版本已经解决了这个问题(https://issues.apache.org/jira/browse/kafka-7895)

epfja78i

epfja78i1#

通过kafka流api创建的ktable suppress state store内部主题,偏移量重置总是发生(在重新启动之后)。
这是当前(版本2.1)的预期行为,因为 suppress() 操作员仅在内存中工作。因此,在重新启动时,必须从changelog主题重新创建抑制缓冲区,然后才能开始处理。
注意,计划让 suppress() 在将来的版本中写入磁盘(参见。https://issues.apache.org/jira/browse/kafka-7224). 这将避免从changelog主题重新创建缓冲区的开销。

ni65a41a

ni65a41a2#

提交频率由参数控制 commit.interval.ms . 检查您的偏移量是否确实已提交。默认情况下,偏移量每100毫秒或30秒提交一次,具体取决于您的处理保证配置。看看这个

4nkexdtk

4nkexdtk3#

我认为@matthias j。sax的回答涵盖了suppress的大部分内部。但有一件事我需要澄清:当你说“重启应用程序”时,你到底做了什么?您是否优雅地关闭了整个应用程序,然后重新启动它?

相关问题