如何减少摄入率Kafka喷口和启用背压?

6ioyuze2  于 2021-06-07  发布在  Kafka
关注(0)|答案(1)|浏览(384)

我使用的是storm kafka客户端1.1.1和storm core 1.1.0。
我已经调整了以下参数,但无法启用背压和减少Kafka喷口的摄取率。
每秒消耗2000条消息。
下游螺栓处理一条消息需要50毫秒,即每秒处理20条消息。
喷口发射元组和螺栓执行元组之间的延迟随着时间的推移而增加。

我怎样才能让spoutread每秒读20条消息,并保持其消耗率与bolt的执行率相同


**Topology**

   topology.max.spout.pending=**5**, 
   topology.message.timeout.secs=**600**, 
   topology.executor.send.buffer.size=**64**, 
   topology.executor.receive.buffer.size=**64**, 
   topology.transfer.buffer.size=**64**

 **KafkaSpoutConfig**

   setPollTimeoutMs(**200**) , 
   setFirstPollOffsetStrategy(latest) , 
   setMaxUncommittedOffsets(**2_000_000**) , 
   setGroupId(groupName) , 
   setProp("fetch.max.wait.ms",**1000**) , 
   setProp("max.poll.records",**100**) , 
   setMaxPartitionFectchBytes(**512**)  , 
   setProp("send.buffer.bytes",**512**) , 
   setProp("receive.buffer.bytes",**512**) , 
   setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") , 
   setProp("session.timeout.ms", "**60000**") , 

   KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
   KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;

我不确定应该为拓扑\u喷口\u等待\u策略和背压\u干扰器\u高\u水印设置什么值
那么,上述参数和数值的组合有助于控制喷口的摄取率呢?
如有任何建议,我们将不胜感激。
谢谢迦腻色迦

oyt4ldly

oyt4ldly1#

拓扑\u喷动\u等待\u策略仅在要求喷动发出新的元组时使用,并且它不会发出任何消息(即,如果没有新消息)。它不应该对背压有任何影响。
我不太熟悉当前的backpressure实现,但是我非常确定您需要使用topology\u backpressure\u enable显式地启用它。
背压\u disruptor \u high \u watermark是一个比率,因此如果将其设置为例如0.9,则当螺栓的输入队列已满90%时,它将对喷口进行节流。您可以在中找到有关设置的文档https://github.com/apache/storm/blob/1.1.x-branch/storm-core/src/jvm/org/apache/storm/config.java,以及https://github.com/apache/storm/blob/1.1.x-branch/conf/defaults.yaml
为了避免一次发出太多的元组,我认为应该将topology.max.spout.pending设置为合理的元组数(可能是几百个?)。确保您的拓扑设置为启用确认(即,将topology.enable.message.timeouts设置为true)。否则,最大喷口挂起没有任何影响。
不知道为什么要更改执行器缓冲区大小。
您还应该考虑将storm和storm kafka客户端升级到至少1.1.2。最近对storm kafka客户端进行了很多修复,如果升级,您可能会更容易使用它。
我不知道你代码里的星星是什么意思?

相关问题