当我开始阅读Kafka主题时,如何从当前偏移量读取

zvms9eto  于 2021-06-08  发布在  Kafka
关注(0)|答案(4)|浏览(375)

我使用的是javaapi消费者连接器。每当消费者开始从一个主题开始阅读时,它都会从一个主题的开头开始阅读,而且要花相当长的时间才能赶上最新的事件。我们如何确保消费者从当前偏移量读取数据?

hfyxw5xn

hfyxw5xn1#

为了完成natalia的回答,我想说的是,您可能不关心存储偏移量,您只希望始终使用最新的消息。
要使用大多数使用者实现(包括0.8.x中的“旧”使用者和0.9.x及更高版本中的“新”使用者)实现这种行为,您需要做两件事:
将组id设置为随机值,这样每次消费者启动时都无法从任何位置恢复偏移,这将触发“偏移重置”请求。
OffsetRequestStrategy (或你使用的客户机中的任何名称)来 latest ,这样当您的客户机请求从kafka获得可用的偏移量时,它将获得日志中最后一条(最新)消息的偏移量。

vaj7vani

vaj7vani2#

对于Kafka0.10(可能更早),您可以这样做:

properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumer = new KafkaConsumer<>(properties);
consumer.seekToEnd(Collections.emptySet());

这将关闭在代理上存储使用者偏移量(因为您没有使用它),并查找所有分区的最新位置。

nkoocmlb

nkoocmlb3#

最简单的方法是禁用自动提交(即, auto.commit.enable=false ),并使用 auto.offset.reset=latest (或 =largest 对于较旧的Kafka版本)。
Kafka的流程如下:
启动消费者
消费者寻找有效的承诺抵消
如果找到,它将从那里恢复处理
如果找不到,则根据“auto.offset.reset”开始处理
因此,只要消费者组有一个有效的提交偏移量,“auto.offset.reset”就没有任何效果。因此,您也不应该手动提交。
如果已经有一个提交的偏移量,您需要在重新启动使用者之前手动删除它,如果您想从当前偏移量读取数据,而不是处理和旧数据(或者使用新的 group.id 您知道没有提交的补偿。)
作为所有这一切的替代方法,您还可以“seek to end”您的使用者中的每个分区。但这会使您的代码更加复杂,如果您的使用者组根本没有提交,那么就可以避免。

1l5u6lss

1l5u6lss4#

对于Kafka9:
如果为消费者设置组id,kafka将为您存储提交(处理)的偏移量。如果你在Kafka使用新的消费者阅读更多
如果总是要从最新偏移量读取,可以指定offsetresetstrategy.latest

相关问题