正如这里提到的简单消费者
https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example
还要注意,我们正在显式检查正在读取的偏移量是否不小于我们请求的偏移量。这是必需的,因为如果kafka正在压缩消息,那么fetch请求将返回整个压缩块,即使请求的偏移量不是压缩块的开头。因此,我们以前看到的信息可能会再次返回。
最后,我们会跟踪所读信息的数量。如果我们没有读到上一个请求中的任何内容,我们就睡一会儿,这样就不会在没有数据的时候敲打Kafka。
在我的程序中,它先读取一条旧消息,然后进入休眠状态,再读取新记录。
有什么办法让simpleconsumer只读取新消息吗?
1条答案
按热度按时间rkue9o1l1#
从同一页
它说的是寻找偏移量来读取
kafka包含两个常量来提供帮助,kafka.api.offsetrequest.earliesttime()在日志中查找数据的开头并从那里开始流式处理,kafka.api.offsetrequest.latesttime()将只流式处理新消息。不要假设偏移量0是起始偏移量,因为消息会随着时间的推移从日志中过时。