在没有提交操作的情况下使用javakafka使用者正确吗?

au9on6nz  于 2021-06-06  发布在  Kafka
关注(0)|答案(1)|浏览(317)

我需要从开始偏移量到结束偏移量读取一组记录。我为此专门使用Kafka消费者。我对至少一次语义(以防给定的应用程序示例宕机,新的应用程序示例从该起始偏移量重新读取记录)没有问题。
那么,我可以使用这样的代码吗?

private static KafkaConsumer<Long, String> createConsumer() {

    final Properties props = new Properties();

    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
    props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    return new KafkaConsumer<>(props);
}

public void process() {

    KafkaConsumer consumer = createConsumer();
    TopicPartition topicPartition = new TopicPartition("topic", 2);
    consumer.assign(List.of(topicPartition));

    long startOffset = 42;
    long endOffset = 100;

    consumer.seek(topicPartition, startOffset);

    boolean isRunning = true;
    while (isRunning) {
        final ConsumerRecords<Long, String> consumerRecords = consumer.poll(1000);

        for (ConsumerRecord<Long, String> record : consumerRecords) {
            if (record.offset() >= endOffset) {
                isRunning = false;
                break;
            }
        }
    }

    consumer.close();
}

所以:
我没有 commit() 我禁用 auto-commit 我没有 group-id 代码正确吗?或者它有一些隐藏的问题?

bttbmeg0

bttbmeg01#

是的,这是正确的用法,你不应该遇到任何问题。这不是Kafka消费者的典型用法,但这是允许的。
从官方 KafkaConsumer javadoc(我的亮点):
https://kafka.apache.org/21/javadoc/org/apache/kafka/clients/consumer/kafkaconsumer.html
控制消费者的位置
在大多数用例中,使用者只需从头到尾消费记录,定期提交其位置(自动或手动)。不过,kafka允许使用者手动控制其位置,在分区中随意向前或向后移动。这意味着消费者可以重新消费旧的记录,或者跳到最近的记录,而不必实际消费中间记录。在一些情况下,手动控制消费者的位置可能很有用。
...
kafka允许使用seek(topicpartition,long)指定新位置。还提供了用于查找服务器维护的最早和最新偏移量的特殊方法(分别为seekToBegining(collection)和seektoend(collection))。

相关问题