kafka消费者已处理/重复记录| java | spring kafka

ylamdve6  于 2021-06-27  发布在  Java
关注(0)|答案(0)|浏览(273)

我是kafka新手,在springboot中编写了一个cron作业来验证sql和kafka主题中的一些记录。这项工作需要每天早上进行一次。我已经设置了工作运行后,每15分钟为我的测试和它的预期工作。但是,当我每2小时更新cron以运行作业时,我就从主题中获得了消费者已经读取/处理和复制的记录。我正在使用commitasync手动提交偏移量。例如,我已经发送了3个记录的主题,但消费者得到了超过5k的记录大多是重复的。
以下是消费者代码及其属性。

public Map<String, Object> getKafkaConsumerProps() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
    props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
    props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "30000");
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "10000");
    props.put(ConsumerConfig.GROUP_ID_CONFIG, "cronKafkaConsumer");
    return props;
}

public List<CostKafkaModel> consumeCosts() {
    KafkaConsumer<String, CostKafkaModel> consumer = new KafkaConsumer<>(
            getKafkaConsumerProps(), new StringDeserializer(),
            new JsonDeserializer<>(CostKafkaModel.class));

    List<CostKafkaModel> kafkaModelList = new ArrayList<>();
    try {
        consumer.subscribe(Arrays.asList("deltaCosts"));
        ConsumerRecords<String, CostKafkaModel> records = consumer
                .poll(1000);
        for (ConsumerRecord<String, CostKafkaModel> record : records) {
            kafkaModelList.add(record.value());
        }

    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        consumer.commitSync();
        consumer.close();
    }
    return kafkaModelList;
}

任何帮助都将受到感谢。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题