如何在ApacheKafka主题中查询记录的最后一个值

5w9g7ksd  于 2021-06-04  发布在  Kafka
关注(0)|答案(2)|浏览(591)

我是新来Kafka,所以可能是很容易。但从我现在面临的问题看不出任何解决办法。我有一个Kafka的主题 metric_32 我想找到钥匙的最新值 user_ . 这在Kafka是怎么可能的。
我试过了 KStream 但当一个新的事件出现在这个主题上时,它就会订阅。但我想查询的是已经出现的键的最后一个值。任何例子都会有帮助。

zqdjd7g9

zqdjd7g91#

如果对于某些主题,您总是对特定键的最后一个值感兴趣,则可以设置 log.cleanup.policy=compact . 这样,每个键最终只会有一条记录。如果您生成5条具有相同id的消息,Kafka中将只保留最后一条消息。这样,如果您有许多具有相同密钥的消息,您可以提高大量磁盘使用率。您可以在此处阅读更多内容:https://dzone.com/articles/kafka-architecture-log-compaction

p3rjfoxz

p3rjfoxz2#

您可以使用状态存储(如果您使用的是kafka流),然后向其添加一个处理器,每当有新值推送到主题时,该处理器都会更新状态存储。

builder.addGlobalStore(storeBuilder, topic, Consumed.with(keySerde, valueSerde), return new Processor<K,V>() {
    private KeyValueStore<K,V> store;

    public void init(ProcessorContext context) {
        store=(KeyValueStore<K,V>) context.getStateStore("statestorename");
    }

    public void process(K key, V value) {
        store.put(key,value);
    }

    public void close() {}
});

然后你可以用

readOnlyStore=streams.store("statestorename", QueryableStoreTypes.keyValueStore());
readOnlyStore.get("key");

相关问题