我有一个JavaSpring应用程序,它基于创建拓扑结构的dto将拓扑提交给storm(1.1.2)nimbus。
这是伟大的工作除了非常大的窗口。我正在测试它与几个不同的滑动和滚动窗口。没有给我任何问题除了一个24小时滑动窗口,每15分钟前进。该拓扑将从kafka接收约250条消息/秒,并使用一个简单的时间戳提取器(与我正在测试的所有其他拓扑非常相似)对它们进行窗口操作。
为了解决这个问题,我使用了大量的worker和内存,但是我的默认配置是1个worker,堆大小为2048mb。我也尝试过减少影响最小的延迟。
我认为有可能是窗口太大,工作人员的内存不足,导致心跳或zookeeper连接检查延迟,进而导致nimbus杀死工作人员。
所发生的情况是,nimbus日志经常报告该拓扑的执行器“不活动”,而该拓扑的工作日志显示 KeeperException
拓扑无法与zookeeper或 java.lang.ExceptionInInitializerError:null
有窝的 PrivelegedActionException
.
当拓扑分配了一个新的worker时,我所做的聚合就丢失了。我假设发生这种情况是因为窗口至少包含250601511(messagespersecondSecondPerMinute15minWindowAdvancesBeforeClash)个消息,每个消息大约84字节。要完成整个窗口,它最终将是250601597条消息(messagespersecondSecondPerMinute15min15min增量为24小时plusExpiredWindow)。如果我的数学是正确的,这是~1.8gbs,所以我觉得工作内存应该覆盖窗口或至少超过11个窗口。
我可以稍微增加记忆,但不会太多。我还可以减少内存/工作线程的数量,增加工作线程/拓扑的数量,但是我想知道我是否缺少了一些东西?我是否可以增加工作人员心跳的时间量,以便在被杀之前有更多的时间让执行者签入,或者这会因为某种原因而不好?如果我改变了心跳信号,if就会出现在拓扑的配置Map中。谢谢!
1条答案
按热度按时间ifmq2ha21#
这是由于工作人员内存不足造成的。从风暴代码看。看起来storm将窗口中的每一条消息作为一个元组(这是一个相当大的对象)保存在一起。有了高速率的信息和24小时的窗口,这是一个很大的内存。
我通过使用一个预扣螺栓来修复这个问题,它可以在最初的1分钟窗口中扣下所有元组,这大大减少了主窗口上的负载,因为它现在每分钟接收一个元组。bucketing窗口不会耗尽内存,因为它的窗口中一次只有一分钟的元组。