我有一个流拓扑,它从一个主题消费,运行一个聚合,并构建一个KTable,该KTable被物化到rocksDB中。
我有另一个应用程序,它每天消耗来自同一主题的所有事件,并为满足某些特定条件的事件(即不再需要的事件)发送墓碑消息。聚合处理这个问题并从状态存储中删除,但我正在考虑监视状态存储的大小或更改日志主题-任何真正告诉我ktable大小的东西。
我已经公开了JMX指标,但似乎没有任何东西能给予我所需要的。我可以看到“放入”rocksDB的总数,但看不到键的总数。我的应用程序是spring Boot ,我希望通过prometheus公开指标。
有人解决了这个问题或任何想法,这将有助于?
2条答案
按热度按时间laximzn51#
您可以通过使用
KeyValueStore#approximateNumEntries()
访问KTable的底层状态存储来获得每个分区中的近似计数,然后将此计数导出到prometheus(每个分区有一个计数)。要访问底层状态存储,您可以使用低级处理器API通过每个StreamTask中的每个ProcessorContext(对应于一个分区)访问
KeyValueStore
。只需将KStream#transformValues()
添加到您的拓扑中:并在ExtractCountTransformer中提取计数给普罗米修斯:
omqzjyyz2#
如果您公开了JMX指标,那么您可以获得许多Kafka指标,您正在寻找的指标是
kafka_stream_state_estimate_num_keys
。