Kafkaktable也流重复更新

ie3xauqp  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(334)

Kafkaktable也流重复更新。
我想处理ktable(用kstream.reduce()创建)changelog流,即ktable中键值的任何更改。但是,即使同一个键值对被多次发送到ktable,它似乎每次都被发送到下游。我只需要在值发生更改时发送键的值更新。
`

groupByKey(Grouped.with(new Serdes.LongSerde(),new Serdes.LongSerde())) 
                .reduce(new Reducer<Long>() {   
                    @Override
                    public Long apply(Long t1, Long t2) {
                        return t2;
                    }
                }).toStream().foreach((key, value) -> //for each update in ID, send update to the stream
        {

            sendUpdate(key); 
        });

`

0yycz8jy

0yycz8jy1#

这是我们的默认行为 KTable#toStream() ,它将changelog主题转换为 KStream ,因此 reduce 每次上游reduce操作符收到消息时更新。
您可以使用处理器api来归档您的期望行为,在本例中,我们使用kstream.transformerValues()。
首先注册keyvaluestore以存储您的最新值:

//you don't need to add number_store, if your KTable already materialized to number_store
streamsBuilder
        .addStateStore(Stores.keyValueStoreBuilder(
                Stores.persistentKeyValueStore("number_store"), Serdes.Long(), Serdes.Long()));

numberKStream
        .transformValues(ExtractIfValueChangedTransformer::new, "number_store")
        .filter((key, value) -> value != null)
        .foreach((key, value) -> sendUpdate(key));

然后我们创建一个 ExtractIfValueChangedTransformer ,如果值已更改,则仅返回新消息的值,如果未更改,则返回null:

public class ExtractIfValueChangedTransformer implements ValueTransformerWithKey<Long, Long, Long> {

    KeyValueStore<Long, Long> kvStore;

    @Override
    public void init(ProcessorContext context) {
        kvStore = (KeyValueStore<Long, Long>) context.getStateStore("number_store");
    }

    @Override
    public Long transform(Long key, Long newValue) {
        Long oldValue = kvStore.get(key);
        kvStore.put(key, newValue);
        if (oldValue == null) return newValue;
        return oldValue.equals(newValue) ? null : newValue;
    }

    @Override
    public void close() {}
}

相关问题