apache pulsar异步使用者设置(可完成的未来)

c9x0cxw0  于 2021-07-14  发布在  Java
关注(0)|答案(1)|浏览(421)

我正在尝试为ApachePulsar设置一个异步使用者,但是我的问题是只有一条消息被接收到,除非我重新启动SpringBoot应用程序,否则没有其他消息通过。不幸的是,有关使用pulsar的completablefuture的文档不是很好,我对使用它们还很陌生。
我的代码如下:

@EventListener(ApplicationReadyEvent.class)
public void subscribe() throws PulsarClientException {

Consumer consumer = this.pulsarClient.newConsumer(JSONSchema.of(TenantMicroserviceProvisioningRequestSchema.class))
            .subscriptionName(this.pulsarSubscriptionName)
            .topic(this.pulsarTopicName)
            .ackTimeout(240, TimeUnit.SECONDS)
            .subscriptionType(SubscriptionType.Exclusive)
            .subscribe();

while(true) {
    CompletableFuture<Message> asyncMessage = consumer.receiveAsync();

asyncMessage.thenAcceptAsync(incomingMessage -> {
        TenantMicroserviceProvisioningRequestSchema schema = (TenantMicroserviceProvisioningRequestSchema) incomingMessage.getValue();

       LOGGER.info(String.format("***New provisioning request recieved for tenant database [%s] with instance code [%s]", schema.getDatabaseName(), schema.getInstanceCode()));

       consumer.acknowledgeAsync(incomingMessage.getMessageId());

    });
 }
}

在java文档中,它确实说“一旦返回completablefuture并完成接收到的消息,就应该随后调用receiveasync()。否则它会在应用程序中创建接收请求的积压,但我不确定如何做到这一点。我认为这是问题的根源。
文件:脉冲星文件:https://pulsar.apache.org/docs/en/client-libraries-java/#async-ReceiveAsync()接收pulsar java文档:https://pulsar.apache.org/api/client/org/apache/pulsar/client/api/consumer.html

***更新***我添加了while循环,但是当我这样做的时候,我的spring引导内存消耗会浮动到10gb。不确定,但想知道未来是怎么安排的。

ua4mk5z4

ua4mk5z41#

为了防止这种过多的内存消耗,您需要向类中添加一个中间数据结构,以限制未完成的数据的数量 CompletableFutures 例如 LinkedBlockingQueue 如下图所示

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.SubscriptionType;

public class AsyncConsumerDemo {

    private static final LinkedBlockingQueue<CompletableFuture<Message<String>>> outstandingMessages = 
      new LinkedBlockingQueue<CompletableFuture<Message<String>>>(1000);

    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public static void main(String[] args) throws PulsarClientException, InterruptedException, ExecutionException {

        PulsarClient pulsarClient = PulsarClient.builder()
                .serviceUrl("pulsar://broker:6650")
                .build();

        Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING)
                    .subscriptionName("pulsarSubscriptionName")
                    .topic("pulsarTopicName")
                    .ackTimeout(240, TimeUnit.SECONDS)
                    .subscriptionType(SubscriptionType.Shared)
                    .subscribe();

        new Thread(() -> { // Message Retrieval Thread
            CompletableFuture<Message<String>> future;

            while ( (future = consumer.receiveAsync()) != null) {
                outstandingMessages.add(future);
            }
          }).start();

        for (int numConsumers = 0; numConsumers < 10; numConsumers++) {
          executor.submit(() -> { // Message Consumer Thread
            while(true) {
                try {
                    CompletableFuture<Message<String>> future = outstandingMessages.take();
                    Message<String> msg = future.get();

                    // Process the message

                    consumer.acknowledgeAsync(msg.getMessageId());

                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            }
          });
       }
    }

}

相关问题