我用Kafka风暴连接Kafka和风暴。我有3个服务器运行zookeeper,kafka和storm。Kafka中有一个主题“test”,它有9个分区。
在storm拓扑中,kafkaspout executor的数量是9,默认情况下,任务的数量也应该是9。“提取”螺栓是唯一一个连接到Kafka斯波特的螺栓,“原木”喷口。
从用户界面来看,喷口的故障率非常高。但是,bolt中执行的消息数=发出的消息数-bolt中失败的消息数。当失败的消息在开始时为空时,此等式几乎匹配。
根据我的理解,这意味着螺栓确实收到了来自喷口的信息,但确认信号在飞行中暂停。这就是为什么喷口中的背包数量如此之少的原因。
这个问题可以通过增加超时秒数和抛出挂起消息数来解决。但这将导致更多的内存使用,我不能增加到无限。
我在想是否有办法强迫风暴忽略某个喷口/螺栓中的ack,以便它在超时前不会等待该信号。这将显著增加吞吐量,但不能保证消息处理。
2条答案
按热度按时间ccrfmcuu1#
你的容量数字有点高,这让我相信你真的在最大限度地利用系统资源(cpu,内存)。换句话说,系统似乎有点陷入困境,这可能就是元组超时的原因。你可以试着用
topology.max.spout.pending
config属性来限制喷口的飞行中元组数。如果您可以适当地减少这个数量,那么拓扑应该能够有效地处理负载,而不需要元组超时。7gcisfzg2#
如果您将应答器的数量设置为0,则storm将自动应答每个样本。
请注意,ui只测量并显示5%的数据流。除非你
尝试增加螺栓的超时时间并减少
topology.max.spout.pending
.另外,请确保spout的nexttuple()方法是非阻塞和优化的。
我还建议您分析代码,也许您的storm队列已满,您需要增加它们的大小。