@kafkalistener消费者承诺一旦满足特定条件。假设主题从偏移量[0]处的生产者“message 0”获取以下数据,偏移量[1]处的生产者“message 1”获取以下数据
它们在消费者处接收并在确认的帮助下提交。acknowledge()
然后,下面的消息进入主题
“消息2”位于偏移量[2]“消息3”位于偏移量[3]
正在运行的使用者接收上述数据。这里条件失败,并且上面的偏移量没有提交。
即使主题中出现了新的数据,那么“消息2”和“消息3”也应该由来自同一消费群体的任何消费者提取,因为他们没有提交。但这并没有发生,消费者收到了一个新的信息。
当我重新启动我的消费者,然后我得到消息2和消息3。这应该发生在消费者运行时。
代码如下-:kafkaconsumerconfig文件
enter code here
@Configuration
@EnableKafka
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
factory.getContainerProperties().setSyncCommits(true);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> propsMap = new HashMap<>();
propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1");
return propsMap;
}
@Bean
public Listener listener() {
return new Listener();
}
}
Listner Class
public class Listener {
public CountDownLatch countDownLatch0 = new CountDownLatch(3);
private Logger LOGGER = LoggerFactory.getLogger(Listener.class);
static int count0 =0;
@KafkaListener(topics = "abcdefghi", group = "group1", containerFactory = "kafkaListenerContainerFactory")
public void listenPartition0(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
@Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) throws InterruptedException {
count0 = count0 + 1;
LOGGER.info("start consumer 0");
LOGGER.info("received message via consumer 0='{}' with partition-offset='{}'", data, partitions + "-" + offsets);
if (count0%2 ==0)
acknowledgment.acknowledge();
LOGGER.info("end of consumer 0");
}
我怎样才能达到我想要的结果?
1条答案
按热度按时间hgb9j2n61#
没错。这个
offset
是一个很容易在消费者示例的内存中保持跟踪的数字。对于相同分区,我们需要为组中新到达的消费者提交补偿。这就是为什么当您重新启动应用程序或组发生重新平衡时,它会按预期工作的原因。要使它按您希望的那样工作,您应该考虑实施
ConsumerSeekAware
在你的听众和电话ConsumerSeekCallback.seek()
对于要从下一个轮询周期开始使用的偏移量。http://docs.spring.io/spring-kafka/docs/2.0.0.m2/reference/html/_reference.html#seek: