我使用的是storm kafka客户端1.1.1和storm core 1.1.0。
我已经调整了以下参数,但无法启用背压和减少Kafka喷口的摄取率。
每秒消耗2000条消息。
下游螺栓处理一条消息需要50毫秒,即每秒处理20条消息。
喷口发射元组和螺栓执行元组之间的延迟随着时间的推移而增加。
我怎样才能让spoutread每秒读20条消息,并保持其消耗率与bolt的执行率相同
**Topology**
topology.max.spout.pending=**5**,
topology.message.timeout.secs=**600**,
topology.executor.send.buffer.size=**64**,
topology.executor.receive.buffer.size=**64**,
topology.transfer.buffer.size=**64**
**KafkaSpoutConfig**
setPollTimeoutMs(**200**) ,
setFirstPollOffsetStrategy(latest) ,
setMaxUncommittedOffsets(**2_000_000**) ,
setGroupId(groupName) ,
setProp("fetch.max.wait.ms",**1000**) ,
setProp("max.poll.records",**100**) ,
setMaxPartitionFectchBytes(**512**) ,
setProp("send.buffer.bytes",**512**) ,
setProp("receive.buffer.bytes",**512**) ,
setPartitionRefreshPeriodMs(30_000).setProp("enable.auto.commit", "true") ,
setProp("session.timeout.ms", "**60000**") ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(**50**) ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(**5**) , 1 ,
KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(**1**) ) ;
我不确定应该为拓扑\u喷口\u等待\u策略和背压\u干扰器\u高\u水印设置什么值
那么,上述参数和数值的组合有助于控制喷口的摄取率呢?
如有任何建议,我们将不胜感激。
谢谢迦腻色迦
1条答案
按热度按时间oyt4ldly1#
拓扑\u喷动\u等待\u策略仅在要求喷动发出新的元组时使用,并且它不会发出任何消息(即,如果没有新消息)。它不应该对背压有任何影响。
我不太熟悉当前的backpressure实现,但是我非常确定您需要使用topology\u backpressure\u enable显式地启用它。
背压\u disruptor \u high \u watermark是一个比率,因此如果将其设置为例如0.9,则当螺栓的输入队列已满90%时,它将对喷口进行节流。您可以在中找到有关设置的文档https://github.com/apache/storm/blob/1.1.x-branch/storm-core/src/jvm/org/apache/storm/config.java,以及https://github.com/apache/storm/blob/1.1.x-branch/conf/defaults.yaml
为了避免一次发出太多的元组,我认为应该将topology.max.spout.pending设置为合理的元组数(可能是几百个?)。确保您的拓扑设置为启用确认(即,将topology.enable.message.timeouts设置为true)。否则,最大喷口挂起没有任何影响。
不知道为什么要更改执行器缓冲区大小。
您还应该考虑将storm和storm kafka客户端升级到至少1.1.2。最近对storm kafka客户端进行了很多修复,如果升级,您可能会更容易使用它。
我不知道你代码里的星星是什么意思?