如何从Kafka的旧偏移点获取数据?

xkftehaa  于 2021-06-07  发布在  Kafka
关注(0)|答案(7)|浏览(301)

我用zookeeper从Kafka那里获取数据。这里我总是从最后一个偏移点得到数据。有没有办法指定获取旧数据的偏移时间?
有一个选项autooffset.reset。它接受最小的或最大的。有人能解释一下什么是最小的和最大的吗。autooffset.reset是否有助于从旧偏移点而不是最新偏移点获取数据?

bbmckpt7

bbmckpt71#

kafka协议文档是处理请求/响应/偏移量/消息的一个很好的来源:https://cwiki.apache.org/confluence/display/kafka/a+guide+to+the+kafka+protocol 您可以使用简单的使用者示例来演示以下代码的状态:

FetchRequest req = new FetchRequestBuilder()

        .clientId(clientName)

        .addFetch(a_topic, a_partition, readOffset, 100000) 

        .build();

FetchResponse fetchResponse = simpleConsumer.fetch(req);

将readoffset设置为起始初始偏移量。但是您需要检查最大偏移量,因为上面将根据addfetch方法的最后一个参数fetchsize提供有限的偏移量计数。

l3zydbqr

l3zydbqr2#

使用者总是属于一个组,对于每个分区,zookeeper跟踪分区中该使用者组的进度。
要从头开始获取,您可以删除hussain提到的与进度相关的所有数据

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");

您还可以指定所需的分区偏移量,如core/src/main/scala/kafka/tools/updateoffsetsinzk.scala中所指定的

ZkUtils.updatePersistentPath(zkClient, topicDirs.consumerOffsetDir + "/" + partition, offset.toString)

但是,偏移量不是时间索引的,但是您知道每个分区都是一个序列。
如果您的消息包含一个时间戳(请注意,这个时间戳与kafka收到您的消息的那一刻无关),您可以尝试使用索引器,通过将偏移量增加n来尝试分步检索一个条目,并将元组(主题x,第2部分,偏移量100,时间戳)存储在某处。
当您想从指定的时刻检索条目时,可以对粗略索引应用二进制搜索,直到找到所需的条目并从中提取。

mbzjlibv

mbzjlibv3#

你试过这个吗?
bin/kafka-console-consumer.sh—引导服务器localhost:9092 --topic 测试--从头开始
它将打印出给定主题的所有消息,本例中的“test”。
来自此链接的更多详细信息https://kafka.apache.org/quickstart

ubof19bj

ubof19bj4#

参考Kafka配置文件:http://kafka.apache.org/08/configuration.html 用于查询偏移量参数的最小值和最大值。
顺便说一句,在探索Kafka时,我想知道如何为消费者重播所有消息。我的意思是,如果一个消费者团体调查了所有的信息,它想重新获得这些信息。
实现的方法是从zookeeper中删除数据。使用kafka.utils.zkutils类删除zookeeper上的节点。其用法如下:

ZkUtils.maybeDeletePath(${zkhost:zkport}", "/consumers/${group.id}");
wljmcqd8

wljmcqd85#

在kafka文档中,他们说“kafka.api.offsetrequest.earliesttime()在日志中找到数据的开头并从那里开始流式处理,kafka.api.offsetrequest.latesttime()将只流式处理新消息。不要假定偏移量0是起始偏移量,因为消息会随着时间的推移从日志中过时。“
在此处使用SimpleConsumer示例:https://cwiki.apache.org/confluence/display/kafka/0.8.0+simpleconsumer+example
类似的问题:kafka高级使用者使用javaapi从主题获取所有消息(相当于--从头开始)
这也许会有帮助

ljo96ir5

ljo96ir56#

使用kafkaconsumer可以使用seek、seektobegining和seektoend在流中移动。
https://kafka.apache.org/0100/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html#seektobeginning(java.util.collection)集合
另外,如果没有提供分区,它将为当前分配的所有分区寻找第一个偏移量。

n53p2ov0

n53p2ov07#

现在呢
Kafkafaq给出了这个问题的答案。
如何使用offsetrequest准确地获取特定时间戳的消息偏移量?
kafka允许按时间查询消息的偏移量,它是在段粒度上这样做的。timestamp参数是unix时间戳,查询offset by timestamp返回消息的最新可能偏移量,该偏移量不晚于给定的时间戳。时间戳有两个特殊值-最新和最早。对于unix时间戳的任何其他值,kafka将获得不晚于给定时间戳创建的日志段的起始偏移量。因此,由于偏移量请求仅以段粒度提供服务,对于较大的段大小,偏移量获取请求返回的结果不太准确。
为了获得更准确的结果,可以根据时间(log.roll.ms)而不是大小(log.segment.bytes)配置日志段大小。但是,应该小心,因为这样做可能会由于频繁的日志段滚动而增加文件处理程序的数量。
未来计划
Kafka将在消息格式中添加时间戳。参考
https://cwiki.apache.org/confluence/display/kafka/kafka+enriched+message+metadata

相关问题