使用kafka消费池正确吗?

knpiaxh1  于 2021-06-04  发布在  Kafka
关注(0)|答案(1)|浏览(698)

有时,我需要从同一主题的特定偏移量的特定分区读取记录。
我可以为每次创建新的Kafka消费者。但是,我可以创建消费者池并以这种方式使用它:

List<KafkaConsumer> consumers = new ArrayList<>();

// acquire consumer
KafkaConsumer consumer = consumers.get(0);
TopicPartition topicPartition = new TopicPartition("my-topic", 42);
consumer.assign(List.of(topicPartition));
consumer.seek(topicPartition, 13);

ConsumerRecords records = consumer.poll(0);
// process records
// .....

// release consumer
consumer.unsubscribe();

我应该建立消费池吗?或者它是无效的,我应该为每次使用创建新的消费者。

f87krz0w

f87krz0w1#

你只需要一个消费者。只需取消订阅并重新分配给另一个 TopicPartition .

String topic = "my-topic";
int partition = 42;
int offset = 13;
boolean running = true;

while(running) {
    TopicPartition topicPartition = new TopicPartition(topic, partition);
    consumer.assign(List.of(topicPartition));
    consumer.seek(topicPartition, offset);

    ConsumerRecords records = consumer.poll(0);
    // process records
    // .....

    // release consumer
    consumer.unsubscribe();
    // Change topic, partition, offset as needed
}

相关问题