Kafkaktable也流重复更新。
我想处理ktable(用kstream.reduce()创建)changelog流,即ktable中键值的任何更改。但是,即使同一个键值对被多次发送到ktable,它似乎每次都被发送到下游。我只需要在值发生更改时发送键的值更新。
`
groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde()))
.reduce(new Reducer<Long>() {
@Override
public Long apply(Long t1, Long t2) {
return t2;
}
}).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
{
sendUpdate(key);
});
`
1条答案
按热度按时间0yycz8jy1#
这是我们的默认行为
KTable#toStream()
,它将changelog主题转换为KStream
,因此reduce
每次上游reduce操作符收到消息时更新。您可以使用处理器api来归档您的期望行为,在本例中,我们使用kstream.transformerValues()。
首先注册keyvaluestore以存储您的最新值:
然后我们创建一个
ExtractIfValueChangedTransformer
,如果值已更改,则仅返回新消息的值,如果未更改,则返回null: