限制Spring Batch中来自Kafka的消息数量

insrf1ej  于 2023-06-29  发布在  Spring
关注(0)|答案(1)|浏览(107)

我将来自KafkaListener的数据存储在ConcurrentLinkedQueue中进行处理。目前,它消耗尽可能多的数据,并完全填满RAM。如何限制队列中的消息数量,以便当达到限制时KafkaListener暂停。

ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();

@KafkaListener(
        topics = "topic",
        id = "topic-kafka-listener",
        groupId = "batch-processor",
        containerFactory = "kafkaListenerContainerFactory"
)
public void receive(@NotNull @Payload List<Message> messages) {
    queue.addAll(messages);
}

如何将队列大小限制为100万?
每当队列被轮询并且有空闲空间时,它应该再次开始侦听。

如何将Kafka消耗消息的速率限制为每秒100,000条消息?

acruukt9

acruukt91#

我没有使用注解,而是使用KafkaConsumer对象手动轮询数据。有了它,就有了更多的控制。

Map<String, Object> consumerConfig = Map.of(
        "bootstrap. Servers", "localhost:9092",
        "key.deserializer", StringDeserializer.class,
        "value.deserializer", StringDeserializer.class,
        "group.id", "batch-processor",
        "max.poll.records", 480000
);
KafkaConsumer<String, Message> kafkaConsumer = new KafkaConsumer<>(consumerConfig);

kafkaConsumer.subscribe(List.of("topic"));
public void receive()    {
    ConsumerRecords<String, Message> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
    consumerRecords.forEach(record -> queue. Add(record. Value()));
}

相关问题