在kafka streams应用程序中,对于不同的输入主题,是否仍然可以使用不同的auto.offset.reset策略?

wyyhbhjk  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(483)

用例是:我有一个kafka streams应用程序,它从一个输入主题消费,并输出到一个中间主题,然后在同一个流中另一个拓扑从这个中间主题消费。
每当更新应用程序id时,这两个主题都会从最早的时间开始。我想将中间主题的auto.offset.reset更改为latest,而将输入主题的auto.offset.reset更改为earlime。

jtjikinw

jtjikinw1#

对。您可以通过以下方式为每个主题设置重置策略:

// Processor API
topology.addSource(AutoOffsetReset offsetReset, String name, String... topics); 

// DSL
builder.stream(String topic, Consumed.with(AutoOffsetReset offsetReset));
builder.table(String topic, Consumed.with(AutoOffsetReset offsetReset));

所有这些方法都有一些允许设置它的重载。

相关问题