ApacheKafka—simpleconsumer模块是否有什么解决方法可以只读取新消息?

vltsax25  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(311)

正如这里提到的简单消费者
https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example
还要注意,我们正在显式检查正在读取的偏移量是否不小于我们请求的偏移量。这是必需的,因为如果kafka正在压缩消息,那么fetch请求将返回整个压缩块,即使请求的偏移量不是压缩块的开头。因此,我们以前看到的信息可能会再次返回。
最后,我们会跟踪所读信息的数量。如果我们没有读到上一个请求中的任何内容,我们就睡一会儿,这样就不会在没有数据的时候敲打Kafka。
在我的程序中,它先读取一条旧消息,然后进入休眠状态,再读取新记录。
有什么办法让simpleconsumer只读取新消息吗?

rkue9o1l

rkue9o1l1#

从同一页

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) {
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(whichTime, 1));
    kafka.javaapi.OffsetRequest request = new kafka.javaapi.OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(),clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);

    if (response.hasError()) {
        System.out.println("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition) );
        return 0;
    }
    long[] offsets = response.offsets(topic, partition);
    return offsets[0];
}

它说的是寻找偏移量来读取
kafka包含两个常量来提供帮助,kafka.api.offsetrequest.earliesttime()在日志中查找数据的开头并从那里开始流式处理,kafka.api.offsetrequest.latesttime()将只流式处理新消息。不要假设偏移量0是起始偏移量,因为消息会随着时间的推移从日志中过时。

相关问题