我是storm编程的新手,我有一份工作要做一个自定义分组方法的测试,看看它是否更好。
所以我必须测量风暴在几种不同条件下的吞吐量,比如不同的工人数量,不同的工人内存等等
很容易使用 conf.setNumWorkers();
但是,我发现很难设置每个worker的最大堆大小。
我试过编辑 storm.yaml
并补充道
worker.childopts: "-Xmn128m-Xms128m-Xmx128m"
与
conf.put(Config.WORKER_CHILDOPTS, "-Xmn128m -Xms128m -Xmx128m");
添加到我的代码中。
但是当我使用 jmap -heap [pid]
在我的工作节点上,我发现最大堆大小仍然是768m,这是默认的最大堆大小。
如何才能达到这样的目的来限制jvm的最大内存使用量?
顺便说一句,我正在使用kafkaspout发送消息给storm,欢迎对我的测试工作提出任何建议。
5条答案
按热度按时间jbose2ul1#
我没有找到设置jvm最大堆大小的正确方法,但是我使用了另一种方法作为解决方法。
注意:我目前手头没有源代码,所以下面的代码只是为了演示这个想法,完全没有经过测试。
该方法利用了storm中失败元组的重传机制。
如果您使用的是某种现有的喷口,例如
KafkaSpout
或者其他任何事情,那么您就不必担心重新传输失败的元组,所有这些都由默认实现来处理。然而,如果你正在实现你自己的喷口你必须自己做。我相信如果你想实现喷口你一定知道
void ack(Object msgId)
以及void fail(Object msgId)
并且很容易实现重传机制。我们的workerbolt看起来像这样,假设它是一个单词计数螺栓。
这不是很准确,因为每个词可能有不同的长度或大小,你可以使用
MEMORY_COUNT
memoryStoredCount
把每个单词的记忆加起来memoryStoredCount
更准确地说。采用这种方法使系统更加可控,因为如果我们设置最大堆大小,并且jvm达到绑定,那么它只会抛出outofmemoryexception并关闭系统,这可能不是我们最初想要的。
sqxo8psd2#
可以通过以下步骤更改辅助进程的最大堆大小:
1:在nimbus节点的conf/storm.yaml文件中添加“worker.heap.memory.mb:2048”;
2:重启nimbus和supervisor
工人堆大小将更改为2gb
30byixjq3#
尝试编辑storm/default.yaml文件。查找worker设置,您将在那里找到设置堆内存的条目。默认情况下应该是这样的!
在同一个文件中,还可以找到以下设置,您可以根据需要更改这些值。
希望有帮助!
mfuanj7w4#
如果您的storm版本<1.0.0,则必须在storm.yaml中设置worker.childopts,然后重新启动主管。这个参数不会作为我收集到的特定于拓扑的选项拉入,因此您所做的拓扑配置更改不会影响结果。
对于storm版本>=1.0.0,您可以参考ssadaqat的答案,但不需要编辑defaults.yaml(这是源代码的一部分),您需要再次将此值插入storm.yaml。
gdrx4gfi5#
尽管ssadaquat的答案是正确的,并且您可以在storm的yaml文件中设置工作内存,但我已经能够通过编程成功地做到这一点:
如果您不想在多个服务器中更改yaml文件,或者根本不想更改yaml文件,那么这样做尤其有用。
有很多答案表明你必须
childopts
为了增加记忆,但那对我不起作用。事实上,我注意到topology_worker_max_heap_size_mb
,的childopts
值自动增加。还要确保服务器上有足够的交换内存,尤其是处理大量数据时。