我目前正在使用spring集成kafka进行实时统计。不过,组名使kafka搜索侦听器以前没有读取的所有值。
@Value("${kafka.consumer.group.id}")
private String consumerGroupId;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getDefaultProperties());
}
public Map<String, Object> getDefaultProperties() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, consumerGroupId);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class);
return properties;
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaMessageListener listener() {
return new KafkaMessageListener();
}
我想开始最新的抵消,而不是被旧的价值观困扰。是否有可能重置组的偏移量?
4条答案
按热度按时间2ledvvac1#
对于Kafka中没有初始偏移的新消费群体,可以设置
AUTO_OFFSET_RESET_CONFIG
:对于现有消费群体,您可以:
更改组id以显示为新的,即。
consumer-group-id-v2
实施ConsumerSeekAware
因此,您可以在初始化过程中寻找所需的偏移量nhaq1z212#
可以使用partitionoffsets注解以精确偏移开始,例如:
ma8fv8wu3#
你可以设置一个
ConsumerRebalanceListener
对于kafka消费者,当您订阅某些主题时,您可以通过KafkaConsumer.endOffsets()
方法,并将其设置为consumer byKafkaConsumer.seek()
方法,如下所示:uurv41yg4#
因为我没有看到任何这样的例子,我要解释一下我是怎么做到的。
你的班级
@KafkaListener
必须实施ConsumerSeekAware
类,该类将允许侦听器在对分区进行属性化时控制偏移量查找(资料来源:https://docs.spring.io/spring-kafka/reference/htmlsingle/#seek )在这里,在重新平衡中,我们使用给定的回调来寻找所有给定主题的最后一个偏移量。多亏了阿尔特姆比兰(https://stackoverflow.com/users/2756547/artem-bilan )为了指引我找到答案。