我将来自KafkaListener的数据存储在ConcurrentLinkedQueue中进行处理。目前,它消耗尽可能多的数据,并完全填满RAM。如何限制队列中的消息数量,以便当达到限制时KafkaListener暂停。
ConcurrentLinkedQueue<Message> queue = new ConcurrentLinkedQueue<>();
@KafkaListener(
topics = "topic",
id = "topic-kafka-listener",
groupId = "batch-processor",
containerFactory = "kafkaListenerContainerFactory"
)
public void receive(@NotNull @Payload List<Message> messages) {
queue.addAll(messages);
}
如何将队列大小限制为100万?
每当队列被轮询并且有空闲空间时,它应该再次开始侦听。
或
如何将Kafka消耗消息的速率限制为每秒100,000条消息?
1条答案
按热度按时间acruukt91#
我没有使用注解,而是使用
KafkaConsumer
对象手动轮询数据。有了它,就有了更多的控制。