Kafka 如何使用Spring Cloud Stream使用主题中的最新记录?

bttbmeg0  于 2023-01-01  发布在  Apache
关注(0)|答案(1)|浏览(181)

我有一个Kafka Streams处理器,它使用三个主题并尝试合并(连接操作)它们的键。连接成功后,它会进行一些聚合,然后将结果推送到目标主题。应用程序第一次运行后,它会尝试使用这些主题中的所有数据。其中两个主题使用类似查找表的数据,这意味着我需要从一开始就使用所有的数据。但是其中一个主题是我的主主题。所以我需要从最新的主题开始使用。但是我的应用程序从一开始就使用所有Kafka主题。所以我想从一开始使用两个主题,从最新的主题使用一个主题。我用的是Spring的云流,Kafka的流绑定器。这里是我的配置和一些代码片段;
应用程序名称:

spring.cloud.stream.function.definition: processName;
spring.cloud.stream.kafka.streams.binder.functions.processName.applicationId: myappId
spring.cloud.stream.bindings.processName-in-0.destination: mainTopic
spring.cloud.stream.bindings.processName-in-0.consumer.group: mainTopic-cg
spring.cloud.stream.bindings.processName-in-0.consumer.startOffset: latest
spring.cloud.stream.bindings.processName-in-1.destination: secondTopic
spring.cloud.stream.bindings.processName-in-1.consumer.group: secondTopic-cg
spring.cloud.stream.bindings.processName-in-1.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-in-2.destination: thirdTopic
spring.cloud.stream.bindings.processName-in-2.consumer.group: thirdTopic-cg
spring.cloud.stream.bindings.processName-in-2.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-out-0.destination: otputTopics

spring.cloud.stream.kafka.streams.binder.replication-factor: 1
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 10000
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: state-store

流处理器:

public Function<KStream<String, MainTopic>,
        Function<KTable<String, SecondTopic>,
                Function<KTable<String, ThirdTopic>,
                        KStream<String, OutputTopic>>>> processName(){
    return mainTopicKStream -> (
            secondTopicTable -> (
                    thirdTopicKTable -> (
                            aggregateOperations.AggregateByAmount(
                                    joinOperations.JoinSecondThirdTopic(mainTopicKStream ,secondTopicTable ,thirdTopicKTable )
                                            .filter((k,v) -> v.IsOk() != 4) 
                                            .groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.OutputTopic()))
                                    , TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
                            ).toStream()
                    )
            ));
}
hi3rlvi2

hi3rlvi21#

有几点。当你有一个使用Spring Cloud Stream绑定器的Kafka Streams应用程序时,你不需要在绑定上设置group信息,只需要你的applicationId设置就足够了。因此,我建议从您的配置中删除这3个group属性。另一件事是,当使用Kafka流绑定器时,任何用户特定的绑定属性都需要在spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer...下设置。这在文档的这一节中提到。请相应地更改您的startOffset配置。另外,请查看文档的同一部分,了解在Kafka Streams绑定器中使用startOffset的语义说明。基本上,start offset属性仅在您第一次启动应用程序时有效。默认情况下,当没有提交的偏移量时,它为earliest。但是可以使用属性重写为latest。可以将传入的KTable具体化为状态存储,从而可以访问所有查找数据。

相关问题