我知道在kafka中,消费者从代理主题中提取消息(pull)?我觉得脉冲星的工作原理是一样的,考虑到 receive 方法块。但我找不到证实。有人能给我指点参考资料或纠正我的错误吗?谢谢
receive
o3imoua41#
在pulsar文件中:在使用者端有一个队列,用于接收从代理推送的消息。可以使用receiverqueuesize参数配置队列大小。默认大小为1000)。每次调用consumer.receive()时,都会从缓冲区中取出一条消息。所以代理将消息推送到消费者端的队列中。当 receive 方法时,消息将被取消排队并返回。pulsar用户将定期向pulsar broker发送许可请求,以便在使用一半的队列时请求更多的消息。这在这里描述。简而言之,正如这里所描述的pulsar也使用基于push的方法,但是有一个模拟消费者拉动的api。
t8e9dugd2#
pulsar的文档清楚地解释了信息消费的工作原理:pulsar consumer origin从apache pulsar集群中的一个或多个主题读取消息。pulsar消费源订阅pulsar主题,处理传入的消息,然后在读取消息时将确认信息发送回pulsar。可以同步(sync)或异步(async)从代理接收消息。 receive 方法同步接收消息。在消息可用之前,使用者进程将被阻止。例如,
Message msg = consumer.receive();
异步接收将立即返回类型为的值 CompletableFuture 一旦有新消息可用,就完成了。例如,
CompletableFuture
CompletableFuture<Message> asyncMessage = consumer.receiveAsync();
2条答案
按热度按时间o3imoua41#
在pulsar文件中:
在使用者端有一个队列,用于接收从代理推送的消息。可以使用receiverqueuesize参数配置队列大小。默认大小为1000)。每次调用consumer.receive()时,都会从缓冲区中取出一条消息。
所以代理将消息推送到消费者端的队列中。当
receive
方法时,消息将被取消排队并返回。pulsar用户将定期向pulsar broker发送许可请求,以便在使用一半的队列时请求更多的消息。这在这里描述。
简而言之,正如这里所描述的
pulsar也使用基于push的方法,但是有一个模拟消费者拉动的api。
t8e9dugd2#
pulsar的文档清楚地解释了信息消费的工作原理:
pulsar consumer origin从apache pulsar集群中的一个或多个主题读取消息。
pulsar消费源订阅pulsar主题,处理传入的消息,然后在读取消息时将确认信息发送回pulsar。
可以同步(sync)或异步(async)从代理接收消息。
receive
方法同步接收消息。在消息可用之前,使用者进程将被阻止。例如,异步接收将立即返回类型为的值
CompletableFuture
一旦有新消息可用,就完成了。例如,