我是新来Kafka,所以可能是很容易。但从我现在面临的问题看不出任何解决办法。我有一个Kafka的主题 metric_32 我想找到钥匙的最新值 user_ . 这在Kafka是怎么可能的。我试过了 KStream 但当一个新的事件出现在这个主题上时,它就会订阅。但我想查询的是已经出现的键的最后一个值。任何例子都会有帮助。
metric_32
user_
KStream
zqdjd7g91#
如果对于某些主题,您总是对特定键的最后一个值感兴趣,则可以设置 log.cleanup.policy=compact . 这样,每个键最终只会有一条记录。如果您生成5条具有相同id的消息,Kafka中将只保留最后一条消息。这样,如果您有许多具有相同密钥的消息,您可以提高大量磁盘使用率。您可以在此处阅读更多内容:https://dzone.com/articles/kafka-architecture-log-compaction
log.cleanup.policy=compact
p3rjfoxz2#
您可以使用状态存储(如果您使用的是kafka流),然后向其添加一个处理器,每当有新值推送到主题时,该处理器都会更新状态存储。
builder.addGlobalStore(storeBuilder, topic, Consumed.with(keySerde, valueSerde), return new Processor<K,V>() { private KeyValueStore<K,V> store; public void init(ProcessorContext context) { store=(KeyValueStore<K,V>) context.getStateStore("statestorename"); } public void process(K key, V value) { store.put(key,value); } public void close() {} });
然后你可以用
readOnlyStore=streams.store("statestorename", QueryableStoreTypes.keyValueStore()); readOnlyStore.get("key");
2条答案
按热度按时间zqdjd7g91#
如果对于某些主题,您总是对特定键的最后一个值感兴趣,则可以设置
log.cleanup.policy=compact
. 这样,每个键最终只会有一条记录。如果您生成5条具有相同id的消息,Kafka中将只保留最后一条消息。这样,如果您有许多具有相同密钥的消息,您可以提高大量磁盘使用率。您可以在此处阅读更多内容:https://dzone.com/articles/kafka-architecture-log-compactionp3rjfoxz2#
您可以使用状态存储(如果您使用的是kafka流),然后向其添加一个处理器,每当有新值推送到主题时,该处理器都会更新状态存储。
然后你可以用