Kafka Streams K表大小监控

fkvaft9z  于 2023-02-18  发布在  Apache
关注(0)|答案(2)|浏览(99)

我有一个流拓扑,它从一个主题消费,运行一个聚合,并构建一个KTable,该KTable被物化到rocksDB中。
我有另一个应用程序,它每天消耗来自同一主题的所有事件,并为满足某些特定条件的事件(即不再需要的事件)发送墓碑消息。聚合处理这个问题并从状态存储中删除,但我正在考虑监视状态存储的大小或更改日志主题-任何真正告诉我ktable大小的东西。
我已经公开了JMX指标,但似乎没有任何东西能给予我所需要的。我可以看到“放入”rocksDB的总数,但看不到键的总数。我的应用程序是spring Boot ,我希望通过prometheus公开指标。
有人解决了这个问题或任何想法,这将有助于?

laximzn5

laximzn51#

您可以通过使用KeyValueStore#approximateNumEntries()访问KTable的底层状态存储来获得每个分区中的近似计数,然后将此计数导出到prometheus(每个分区有一个计数)。
要访问底层状态存储,您可以使用低级处理器API通过每个StreamTask中的每个ProcessorContext(对应于一个分区)访问KeyValueStore。只需将KStream#transformValues()添加到您的拓扑中:

kStream
        ...
        .transformValues(ExtractCountTransformer::new, "your_ktable_name")
        ...

并在ExtractCountTransformer中提取计数给普罗米修斯:

@Log4j2
public class ExtractCountTransformer implements ValueTransformerWithKey<String, String, String> {

    private KeyValueStore<String, String> yourKTableKvStore;
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
        yourKTableKvStore = (KeyValueStore<String, String>) context.getStateStore("your_ktable_name");
    }

    @Override
    public String transform(String readOnlyKey, String value) {
        //extract count to prometheus
        log.debug("partition {} - approx count {}", context.partition(), yourKTableKvStore.approximateNumEntries());
        yourKTableKvStore.approximateNumEntries();
        return value;
    }

    @Override
    public void close() {

    }
}
omqzjyyz

omqzjyyz2#

如果您公开了JMX指标,那么您可以获得许多Kafka指标,您正在寻找的指标是kafka_stream_state_estimate_num_keys

相关问题