我有一个Kafka Streams处理器,它使用三个主题并尝试合并(连接操作)它们的键。连接成功后,它会进行一些聚合,然后将结果推送到目标主题。应用程序第一次运行后,它会尝试使用这些主题中的所有数据。其中两个主题使用类似查找表的数据,这意味着我需要从一开始就使用所有的数据。但是其中一个主题是我的主主题。所以我需要从最新的主题开始使用。但是我的应用程序从一开始就使用所有Kafka主题。所以我想从一开始使用两个主题,从最新的主题使用一个主题。我用的是Spring的云流,Kafka的流绑定器。这里是我的配置和一些代码片段;
应用程序名称:
spring.cloud.stream.function.definition: processName;
spring.cloud.stream.kafka.streams.binder.functions.processName.applicationId: myappId
spring.cloud.stream.bindings.processName-in-0.destination: mainTopic
spring.cloud.stream.bindings.processName-in-0.consumer.group: mainTopic-cg
spring.cloud.stream.bindings.processName-in-0.consumer.startOffset: latest
spring.cloud.stream.bindings.processName-in-1.destination: secondTopic
spring.cloud.stream.bindings.processName-in-1.consumer.group: secondTopic-cg
spring.cloud.stream.bindings.processName-in-1.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-in-2.destination: thirdTopic
spring.cloud.stream.bindings.processName-in-2.consumer.group: thirdTopic-cg
spring.cloud.stream.bindings.processName-in-2.consumer.startOffset: earliest
spring.cloud.stream.bindings.processName-out-0.destination: otputTopics
spring.cloud.stream.kafka.streams.binder.replication-factor: 1
spring.cloud.stream.kafka.streams.binder.configuration.commit.interval.ms: 10000
spring.cloud.stream.kafka.streams.binder.configuration.state.dir: state-store
流处理器:
public Function<KStream<String, MainTopic>,
Function<KTable<String, SecondTopic>,
Function<KTable<String, ThirdTopic>,
KStream<String, OutputTopic>>>> processName(){
return mainTopicKStream -> (
secondTopicTable -> (
thirdTopicKTable -> (
aggregateOperations.AggregateByAmount(
joinOperations.JoinSecondThirdTopic(mainTopicKStream ,secondTopicTable ,thirdTopicKTable )
.filter((k,v) -> v.IsOk() != 4)
.groupByKey(Grouped.with(AppSerdes.String(), AppSerdes.OutputTopic()))
, TimeWindows.of(Duration.ofMinutes(1)).advanceBy(Duration.ofMinutes(1))
).toStream()
)
));
}
1条答案
按热度按时间hi3rlvi21#
有几点。当你有一个使用Spring Cloud Stream绑定器的Kafka Streams应用程序时,你不需要在绑定上设置
group
信息,只需要你的applicationId
设置就足够了。因此,我建议从您的配置中删除这3个group
属性。另一件事是,当使用Kafka流绑定器时,任何用户特定的绑定属性都需要在spring.cloud.stream.kafka.streams.bindings.<binding-name>.consumer...
下设置。这在文档的这一节中提到。请相应地更改您的startOffset
配置。另外,请查看文档的同一部分,了解在Kafka Streams绑定器中使用startOffset
的语义说明。基本上,start offset
属性仅在您第一次启动应用程序时有效。默认情况下,当没有提交的偏移量时,它为earliest
。但是可以使用属性重写为latest
。可以将传入的KTable
具体化为状态存储,从而可以访问所有查找数据。