我正在尝试为ApachePulsar设置一个异步使用者,但是我的问题是只有一条消息被接收到,除非我重新启动SpringBoot应用程序,否则没有其他消息通过。不幸的是,有关使用pulsar的completablefuture的文档不是很好,我对使用它们还很陌生。
我的代码如下:
@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {
Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
.subscriptionName(this.pulsarSubscriptionName)
.topic(this.pulsarTopicName)
.ackTimeout(240, TimeUnit.SECONDS)
.subscriptionType(SubscriptionType.Exclusive)
.subscribe();
while(true) {
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
asyncMessage.thenAcceptAsync(incomingMessage -> {
TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();
LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));
consumer.acknowledgeAsync(incomingMessage.getMessageId());
});
}
}
在java文档中,它确实说“一旦返回completablefuture并完成接收到的消息,就应该随后调用receiveasync()。否则它会在应用程序中创建接收请求的积压,但我不确定如何做到这一点。我认为这是问题的根源。
文件:脉冲星文件:https://pulsar.apache.org/docs/en/client-libraries-java/#async-ReceiveAsync()接收pulsar java文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/consumer.html
***更新***我添加了while循环,但是当我这样做的时候,我的spring引导内存消耗会浮动到10gb。不确定,但想知道未来是怎么安排的。
1条答案
按热度按时间ua4mk5z41#
为了防止这种过多的内存消耗,您需要向类中添加一个中间数据结构,以限制未完成的数据的数量
CompletableFutures
例如LinkedBlockingQueue
如下图所示