我使用下面的示例来使用SpringKafka消费者阅读消息。我的用例要求每次生成一条消息时,监听器都要从头开始读取。
@KafkaListener(
id = "grouplistener",
topicPartitions = {
@TopicPartition(
topic = "mycompactedtopic", partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0")
)
}
)
public void onReceiving(
String payload, @Header(KafkaHeaders.OFFSET) Integer offset,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
log.info(
"Processing topic = {}, partition = {}, offset = {}, payload= {}",
topic, partition, offset, payload
);
}
我似乎只能让它在应用程序启动时从一开始就读取,然后它通常只消耗向前的消息。
有没有办法强迫它每次都开始?
5条答案
按热度按时间yqhsw0fo1#
下面我将如何实现它。你需要实施
ConsumerSeekAware
接口并在上执行一些实现onPartitionsAssigned
方法。如果在重新启动应用程序时发送环境变量,也可以按需进行seekToBeging。不过我还没实现呢!pkbketx92#
Kafka2.3.1
wqsoz72f3#
使用具有一个分区的压缩主题来保存配置列表。然后需要由rest端点调用它,它应该显示配置的完整唯一列表
实现这一点的方法应该是使用kafka流和ktable,并在rest层后面设置交互式查询。不是一个标准的消费者,需要倒带自己获得最新的系统状态。
kafka connect框架中已经存在这样一个示例,其中有一个配置主题,您只能访问
GET /connectors/name/config
,并且只有重新启动它或扩展到更多示例时,它才会再次消耗所有消息。schema registry也是一个例子,它存储数据库中所有模式的内部hashmap_schemas
主题,并具有用于读取、插入和删除的RESTAPI本质上,当您为给定的键获得新配置时,您可以用一个全新的键“替换”给定键的旧值,或者以某种方式将旧值与新数据“合并”。
g6baxovj4#
我认为你应该试着写一个consumersekawarelistener,每次你读到消息的时候都要把偏移量设为0。听起来像是疯狂的解决方法,但它可能会有所帮助。希望这能帮到你:-)
irtuqstp5#
@所以这是一个纯java实现。我不确定它是否符合你的需要。因此,基本上我提交了一个0的偏移量(意思是,即使我读了Kafka的主题,我回到偏移量是在开始)。我不确定你是否考虑过这个实现,但请让我知道,如果这是你正在寻找的
省去commitcountobj。这不是你需要的。所以默认情况下,offsetmap会有下一个这样的偏移记录,
offsetmap.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(record.offset()+1,“some commit success message”);
但是对于您的用例,我做了一些修改,当消费者没有重新启动时,它工作得很好
offsetmap.put(new topicpartition(record.topic(),record.partition()),new offsetandmetadata(0,“未提交完成”);