如何设置storm worker的jvm最大堆大小?

ttcibm8c  于 2021-06-07  发布在  Kafka
关注(0)|答案(5)|浏览(471)

我是storm编程的新手,我有一份工作要做一个自定义分组方法的测试,看看它是否更好。
所以我必须测量风暴在几种不同条件下的吞吐量,比如不同的工人数量,不同的工人内存等等
很容易使用 conf.setNumWorkers(); 但是,我发现很难设置每个worker的最大堆大小。
我试过编辑 storm.yaml 并补充道

worker.childopts: "-Xmn128m-Xms128m-Xmx128m"

conf.put(Config.WORKER_CHILDOPTS, "-Xmn128m -Xms128m -Xmx128m");

添加到我的代码中。
但是当我使用 jmap -heap [pid] 在我的工作节点上,我发现最大堆大小仍然是768m,这是默认的最大堆大小。
如何才能达到这样的目的来限制jvm的最大内存使用量?
顺便说一句,我正在使用kafkaspout发送消息给storm,欢迎对我的测试工作提出任何建议。

jbose2ul

jbose2ul1#

我没有找到设置jvm最大堆大小的正确方法,但是我使用了另一种方法作为解决方法。
注意:我目前手头没有源代码,所以下面的代码只是为了演示这个想法,完全没有经过测试。
该方法利用了storm中失败元组的重传机制。
如果您使用的是某种现有的喷口,例如 KafkaSpout 或者其他任何事情,那么您就不必担心重新传输失败的元组,所有这些都由默认实现来处理。
然而,如果你正在实现你自己的喷口你必须自己做。我相信如果你想实现喷口你一定知道 void ack(Object msgId) 以及 void fail(Object msgId) 并且很容易实现重传机制。
我们的workerbolt看起来像这样,假设它是一个单词计数螺栓。

class WorkerBolt implements IRichBolt{
    ...
    private HashMap<String, int> counts = new HashMap<>();
    private int wordStoredCount = 0;
    private final int COUNT_LIMIT = 500000;  // Here's our limit
    ...
    void execute(Tuple tuple){
        if(wordStoreCount >= COUNT_LIMIT){
            tuple.fail();
        }

        // do our counting stuff

        wordStoredCount++;

        // send the tuple downstream to aggregate/process/etc. if needed
    }
    ...

这不是很准确,因为每个词可能有不同的长度或大小,你可以使用
MEMORY_COUNT memoryStoredCount 把每个单词的记忆加起来 memoryStoredCount 更准确地说。
采用这种方法使系统更加可控,因为如果我们设置最大堆大小,并且jvm达到绑定,那么它只会抛出outofmemoryexception并关闭系统,这可能不是我们最初想要的。

sqxo8psd

sqxo8psd2#

可以通过以下步骤更改辅助进程的最大堆大小:
1:在nimbus节点的conf/storm.yaml文件中添加“worker.heap.memory.mb:2048”;
2:重启nimbus和supervisor
工人堆大小将更改为2gb

30byixjq

30byixjq3#

尝试编辑storm/default.yaml文件。查找worker设置,您将在那里找到设置堆内存的条目。默认情况下应该是这样的!

worker.heap.memory.mb: 768

在同一个文件中,还可以找到以下设置,您可以根据需要更改这些值。

topology.component.resources.onheap.memory.mb: 128.0
topology.component.resources.offheap.memory.mb: 0.0
topology.component.cpu.pcore.percent: 10.0
topology.worker.max.heap.size.mb: 768.0

希望有帮助!

mfuanj7w

mfuanj7w4#

如果您的storm版本<1.0.0,则必须在storm.yaml中设置worker.childopts,然后重新启动主管。这个参数不会作为我收集到的特定于拓扑的选项拉入,因此您所做的拓扑配置更改不会影响结果。
对于storm版本>=1.0.0,您可以参考ssadaqat的答案,但不需要编辑defaults.yaml(这是源代码的一部分),您需要再次将此值插入storm.yaml。

gdrx4gfi

gdrx4gfi5#

尽管ssadaquat的答案是正确的,并且您可以在storm的yaml文件中设置工作内存,但我已经能够通过编程成功地做到这一点:

Config stormConfig = new Config();
int fourGB = 4 * 1024;
stormConfig.put(Config.TOPOLOGY_WORKER_MAX_HEAP_SIZE_MB, fourGB);

如果您不想在多个服务器中更改yaml文件,或者根本不想更改yaml文件,那么这样做尤其有用。
有很多答案表明你必须 childopts 为了增加记忆,但那对我不起作用。事实上,我注意到 topology_worker_max_heap_size_mb ,的 childopts 值自动增加。还要确保服务器上有足够的交换内存,尤其是处理大量数据时。

相关问题