Kafka消费者根据条件手动提交

eivgtgni  于 2021-06-08  发布在  Kafka
关注(0)|答案(1)|浏览(471)

@kafkalistener消费者承诺一旦满足特定条件。假设主题从偏移量[0]处的生产者“message 0”获取以下数据,偏移量[1]处的生产者“message 1”获取以下数据
它们在消费者处接收并在确认的帮助下提交。acknowledge()
然后,下面的消息进入主题
“消息2”位于偏移量[2]“消息3”位于偏移量[3]
正在运行的使用者接收上述数据。这里条件失败,并且上面的偏移量没有提交。
即使主题中出现了新的数据,那么“消息2”和“消息3”也应该由来自同一消费群体的任何消费者提取,因为他们没有提交。但这并没有发生,消费者收到了一个新的信息。
当我重新启动我的消费者,然后我得到消息2和消息3。这应该发生在消费者运行时。
代码如下-:kafkaconsumerconfig文件

enter code here

@Configuration
@EnableKafka
public class KafkaConsumerConfig {
    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.setBatchListener(true);
factory.getContainerProperties().setAckMode(AbstractMessageListenerContainer.AckMode.MANUAL_IMMEDIATE);
        factory.getContainerProperties().setSyncCommits(true);
        return factory;
    }

    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,"1");
        return propsMap;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }
}

Listner Class
public class Listener {
    public CountDownLatch countDownLatch0 = new CountDownLatch(3);
    private Logger LOGGER = LoggerFactory.getLogger(Listener.class);
    static int count0 =0;

    @KafkaListener(topics = "abcdefghi", group = "group1", containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition0(String data, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) List<Integer> partitions,
                                 @Header(KafkaHeaders.OFFSET) List<Long> offsets, Acknowledgment acknowledgment) throws InterruptedException {
        count0 = count0 + 1;
        LOGGER.info("start consumer 0");

        LOGGER.info("received message via consumer 0='{}' with partition-offset='{}'", data, partitions + "-" + offsets);
        if (count0%2 ==0)
            acknowledgment.acknowledge();
        LOGGER.info("end of consumer 0");

    }

我怎样才能达到我想要的结果?

hgb9j2n6

hgb9j2n61#

没错。这个 offset 是一个很容易在消费者示例的内存中保持跟踪的数字。对于相同分区,我们需要为组中新到达的消费者提交补偿。这就是为什么当您重新启动应用程序或组发生重新平衡时,它会按预期工作的原因。
要使它按您希望的那样工作,您应该考虑实施 ConsumerSeekAware 在你的听众和电话 ConsumerSeekCallback.seek() 对于要从下一个轮询周期开始使用的偏移量。
http://docs.spring.io/spring-kafka/docs/2.0.0.m2/reference/html/_reference.html#seek:

public class Listener implements ConsumerSeekAware {

    private final ThreadLocal<ConsumerSeekCallback> seekCallBack = new ThreadLocal<>();

    @Override
    public void registerSeekCallback(ConsumerSeekCallback callback) {
        this.seekCallBack.set(callback);
    }

    @KafkaListener()
    public void listen(...) {
        this.seekCallBack.get().seek(topic, partition, 0);
    }

}

相关问题