storm-shuffle分组不均匀分布

jm81lzqq  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(260)

我有一个简单的风暴拓扑,主要由两个螺栓组成:
BatchPricinGrequesReaderBolt—接受一个批处理请求,并为批处理中的每个单独定价请求发出一个元组。
pricingrequesthandlerbolt-计算定价请求的值。
我有一批PricingRequestReaderBolt和四批PricingRequestHandlerBolt。
pricingrequesthandlerbolt作为一个随机分组添加到批pricingrequestheaderbolt上。
在我的批处理请求有四个定价请求的情况下,我希望所有四个pricingrequesthandlerbolts同时运行—每个都处理一个元组。然而,这不是我观察到的。有时它是这样工作的,但大多数时候它只使用三个或两个pricingrequesthandlerbolts。
我是不是漏掉了什么明显的东西?
编辑:下面是一些简单的代码来演示这个问题。我希望所有四个元组都能同时处理,但大多数情况下并不是这样。

import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;

import java.util.Map;

public class SimpleStormTest {
    public static void main(String[] args) {
        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("TestSpout", new TestSpout(), 1);
        builder.setBolt("TestBolt", new TestBolt(), 4)
                .shuffleGrouping("TestSpout");

        StormTopology topology = builder.createTopology();
        Config conf = new Config();

        conf.setNumWorkers(5);

        LocalCluster cluster = new LocalCluster();
        cluster.submitTopology("LocalTopology", conf, topology);
    }

    public static class TestSpout extends BaseRichSpout {
        private transient SpoutOutputCollector collector;
        int counter = 0;

        @Override
        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
            System.out.println("%%% open " + Thread.currentThread().getName());
            this.collector = collector;
        }

        @Override
        public void nextTuple() {
            System.out.println("nextTuple");
            try {
                Thread.sleep(10_000);
                int valueToEmit = counter;
                counter++;
                collector.emit(new Values(valueToEmit));
                collector.emit(new Values(valueToEmit));
                collector.emit(new Values(valueToEmit));
                collector.emit(new Values(valueToEmit));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("Input"));
        }
    }

    public static class TestBolt extends BaseRichBolt {
        private transient OutputCollector collector;

        @Override
        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
            this.collector = collector;
        }

        @Override
        public void execute(Tuple input) {
            System.out.println("!!! Executing " + Thread.currentThread().getName());
            try {
                Thread.sleep(2000);
                collector.emit(new Values(input.getValueByField("Input")));
                collector.ack(input);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        @Override
        public void declareOutputFields(OutputFieldsDeclarer declarer) {
            declarer.declare(new Fields("Result"));
        }
    }
}

我想我只是不明白一些基本的概念,否则这将是一个非常严重的问题。
编辑2:我已经写了我自己的分组,现在的行为正如我所期望的,但我认为这是一个如此常见的用例,一定有一些基本的东西我只是不明白。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题