以下是我们物联网平台的入站消息流程:
Device ---(MQTT)---> RabbitMQ Broker ---(AMQP)---> Apache Storm ---> Kafka
我希望实现一个解决方案,有效地限制/限制在每个客户端上每秒向kafka发布的数据量。
当前的策略使用guava的RateLimitor,每个设备都有自己的本地缓存示例。当接收到设备消息时,将从缓存中提取Map到该设备ID的RateLimitor,然后 tryAquire()
方法被调用。如果成功地获得了许可证,那么元组将像往常一样转发给kafka,否则,将超出配额,消息将被无声地丢弃。这种方法相当麻烦,在某些时候注定要失败或成为瓶颈。
我一直在阅读kafka的字节率配额,相信这在我们的案例中会非常有效,特别是因为kafka客户机可以动态配置。在我们的平台中创建虚拟设备时,应在其中添加新的client.id client.id == deviceId
.
让我们假设以下用例为例:
管理员创建2个虚拟设备:湿度和温度传感器
将触发一个规则,以便在kafka中为上述设备创建新的用户/客户机ID条目
通过kafka cli设置生产者配额值
两个设备都发出入站事件消息
...?
这是我的问题。如果使用单个生产者示例,是否可以指定 client.id
在打电话之前在生产记录或生产商的某个地方 send()
? 如果一个生产者只允许一个 client.id
,这是否意味着每个设备都必须有自己的生产商?如果只允许一对一Map,那么明智的做法是缓存数百个(如果不是数千个)生产者示例,每个设备一个?有没有更好的方法我还不知道?
注意:我们的平台是一个“开门系统”,这意味着客户端永远不会收到错误响应,如“超过速率”或任何错误。它对最终用户是透明的。因此,我不能干扰rabbitmq中的数据,也不能将消息重新路由到不同的队列。。我唯一的选择是把这些东西整合在风暴和Kafka之间。
2条答案
按热度按时间nwwlzxa71#
您可以配置
client.id
按应用程序:properties.put ("client.id", "humidity")
或者properties.put ("client.id", "temp")
根据每个client.id
您可以设置值我怀疑我和这个配置有关(
producer_byte_rate = 1024, consumer_byte_rate = 2048, request_percentage = 200
),生产者不采用插入的配置,因为使用者工作正常ej83mcc02#
你可以指定
client.id
在Producer
对象,请记住它们是重量级的,并且您可能不愿意创建它们的多个示例(尤其是在每个设备上)。关于减少
Producer
,您是否考虑过为每个用户而不是每个设备创建一个,或者甚至有一个有限的共享池?Kafka的消息头可以用来辨别哪个设备实际产生了数据。缺点是您需要限制消息的生成,这样一个设备就不能从其他设备获取所有资源。但是,您可以限制kafka代理端的用户,并将配置应用于默认用户/客户端:
看到了吗https://kafka.apache.org/documentation/#design_quotas 更多的例子和深入的解释。
如何识别消息取决于您的体系结构,可能的解决方案包括:
每个用户的主题/分区(例如。
data-USERABCDEF
)如果您决定使用公共主题,则可以将生产者数据放入消息头中-https://kafka.apache.org/0110/javadoc/index.html?org/apache/kafka/common/header/headers.html ,或者您可以将它们放入有效负载本身