从抽象的Angular 来看,apachekafka将数据存储在主题中。消费者可以读取这些数据。我想有一个(显示器)-消费者的greps数据与一定的年龄。监视器应该向子系统发送一个警告:记录仍然未读,如果达到保留时间,Kafka将丢弃这些记录。直到现在我才找到合适的方法。
mbyulnm01#
你可以用 KafkaConsumer.offsetsForTimes() 将邮件Map到日期。例如,如果用昨天的日期调用它,它返回偏移量x,那么偏移量小于x的任何消息都比昨天旧。然后,您的逻辑可以从您的消费者的当前位置中找出您是否有可能丢弃未处理的记录。请注意,目前有一个kip正在讨论中,用于公开度量以跟踪:https://cwiki.apache.org/confluence/display/kafka/kip-223+-+add+per-topic+min+lead+and+per-partition+lead+metrics+to+kafkaconsumerhttp://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#offsetsfortimes-java.util.map文件-
KafkaConsumer.offsetsForTimes()
1条答案
按热度按时间mbyulnm01#
你可以用
KafkaConsumer.offsetsForTimes()
将邮件Map到日期。例如,如果用昨天的日期调用它,它返回偏移量x,那么偏移量小于x的任何消息都比昨天旧。
然后,您的逻辑可以从您的消费者的当前位置中找出您是否有可能丢弃未处理的记录。
请注意,目前有一个kip正在讨论中,用于公开度量以跟踪:https://cwiki.apache.org/confluence/display/kafka/kip-223+-+add+per-topic+min+lead+and+per-partition+lead+metrics+to+kafkaconsumer
http://kafka.apache.org/10/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#offsetsfortimes-java.util.map文件-