我知道如何配置Kafka阅读最早或最新的消息。如果我需要从以前的偏移量读取,我们如何包含一个附加选项?我需要这样做的原因是,由于先前处理逻辑中的某些错误,需要再次处理先前读取的消息。
nxowjjhe1#
我正试图回答一个类似但不完全相同的问题,所以让我们看看我的信息是否可以帮助你。首先,我从另一个问题/答案开始工作简而言之,您需要提交偏移量,最常见的解决方案是zookeeper。因此,如果您的消费者遇到错误或需要关闭,它可以从停止的地方恢复。我自己正在处理一个非常大的高容量流,我的消费者(为了测试)每次都需要从最后面开始。文件表明,我必须使用Kafka寻求声明我的出发点。一旦我的发现是成功和可靠的,我会在这里更新我的发现。当然这是一个解决了的问题。
5lhxktic2#
在javakafka客户机中,有一些关于kafka消费者的方法可以用来指定下一个消费位置。public void seek(主题分区,长偏移)重写使用者将在下一次轮询(超时)时使用的获取偏移量。如果对同一分区多次调用此api,则下次poll()时将使用最新的偏移量。请注意,如果在使用过程中随意使用此api来重置获取偏移量,则可能会丢失数据这就足够了,还有seektobeing和seektoend。
2条答案
按热度按时间nxowjjhe1#
我正试图回答一个类似但不完全相同的问题,所以让我们看看我的信息是否可以帮助你。
首先,我从另一个问题/答案开始工作
简而言之,您需要提交偏移量,最常见的解决方案是zookeeper。因此,如果您的消费者遇到错误或需要关闭,它可以从停止的地方恢复。
我自己正在处理一个非常大的高容量流,我的消费者(为了测试)每次都需要从最后面开始。文件表明,我必须使用Kafka寻求声明我的出发点。
一旦我的发现是成功和可靠的,我会在这里更新我的发现。当然这是一个解决了的问题。
5lhxktic2#
在javakafka客户机中,有一些关于kafka消费者的方法可以用来指定下一个消费位置。
public void seek(主题分区,长偏移)
重写使用者将在下一次轮询(超时)时使用的获取偏移量。如果对同一分区多次调用此api,则下次poll()时将使用最新的偏移量。请注意,如果在使用过程中随意使用此api来重置获取偏移量,则可能会丢失数据
这就足够了,还有seektobeing和seektoend。