{
....
// This is the minimum number of bytes of messages that must be available to give a response, default 1 byte
fetchMinBytes: 1,
// The maximum bytes to include in the message set for this partition. This helps bound the size of the response.
fetchMaxBytes: 1024 * 1024,
}
4条答案
按热度按时间h5qlskok1#
据我所知,api没有任何类型的节流。但是两个使用者(consumer和highlevelconsumer)都有一个'pause()'函数。所以你可以停止消费,如果你有太多的信息。也许这已经提供了你所需要的。
请记住发生了什么事。您向代理发送一个fetch请求并获得一批消息。您可以配置要获取的消息的最小和最大大小(根据文档而不是消息的数量):
iqih9akk2#
在Kafka,投票和过程应该以协调/同步的方式进行。也就是说,在每次投票之后,您应该先处理所有接收到的数据,然后再进行下一次投票。此模式将自动将消息数限制到客户端可以处理的最大吞吐量。
类似这样的内容(伪代码):
(这就是为什么没有参数“fetch.max.messages”——您不需要它的原因。)
mrphzbgm3#
来自自述中的常见问题
创建
async.queue
具有消息处理器和并发性(消息处理器本身用setImmediate
使其不会冻结事件循环)设置
queue.drain
至resume()
消费者消费者的消息事件的处理程序
pause()
消费者并将消息推送到队列。kiz8lqtg4#
我也遇到过类似的情况,我正在使用来自kafka的消息,并且不得不限制使用,因为我的消费服务依赖于第三方api,而第三方api有自己的限制。
我曾经
async/queue
还有一包async/cargo
打电话asyncTimedCargo
用于配料。cargo从kafka消费者那里获取所有消息,并在达到大小限制时将其发送到队列batch_config.batch_size
或超时batch_config.batch_timeout
.async/queue
提供saturated
以及unsaturated
如果队列任务工作线程正忙,则可以使用回调来停止使用。这将阻止货物从填补和你的应用程序不会用尽内存。一旦不饱和,消费就会恢复。