我有一个简单的风暴拓扑,主要由两个螺栓组成:
BatchPricinGrequesReaderBolt—接受一个批处理请求,并为批处理中的每个单独定价请求发出一个元组。
pricingrequesthandlerbolt-计算定价请求的值。
我有一批PricingRequestReaderBolt和四批PricingRequestHandlerBolt。
pricingrequesthandlerbolt作为一个随机分组添加到批pricingrequestheaderbolt上。
在我的批处理请求有四个定价请求的情况下,我希望所有四个pricingrequesthandlerbolts同时运行—每个都处理一个元组。然而,这不是我观察到的。有时它是这样工作的,但大多数时候它只使用三个或两个pricingrequesthandlerbolts。
我是不是漏掉了什么明显的东西?
编辑:下面是一些简单的代码来演示这个问题。我希望所有四个元组都能同时处理,但大多数情况下并不是这样。
import org.apache.storm.Config;
import org.apache.storm.LocalCluster;
import org.apache.storm.generated.StormTopology;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.OutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.TopologyBuilder;
import org.apache.storm.topology.base.BaseRichBolt;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import java.util.Map;
public class SimpleStormTest {
public static void main(String[] args) {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("TestSpout", new TestSpout(), 1);
builder.setBolt("TestBolt", new TestBolt(), 4)
.shuffleGrouping("TestSpout");
StormTopology topology = builder.createTopology();
Config conf = new Config();
conf.setNumWorkers(5);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("LocalTopology", conf, topology);
}
public static class TestSpout extends BaseRichSpout {
private transient SpoutOutputCollector collector;
int counter = 0;
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
System.out.println("%%% open " + Thread.currentThread().getName());
this.collector = collector;
}
@Override
public void nextTuple() {
System.out.println("nextTuple");
try {
Thread.sleep(10_000);
int valueToEmit = counter;
counter++;
collector.emit(new Values(valueToEmit));
collector.emit(new Values(valueToEmit));
collector.emit(new Values(valueToEmit));
collector.emit(new Values(valueToEmit));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Input"));
}
}
public static class TestBolt extends BaseRichBolt {
private transient OutputCollector collector;
@Override
public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
this.collector = collector;
}
@Override
public void execute(Tuple input) {
System.out.println("!!! Executing " + Thread.currentThread().getName());
try {
Thread.sleep(2000);
collector.emit(new Values(input.getValueByField("Input")));
collector.ack(input);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("Result"));
}
}
}
我想我只是不明白一些基本的概念,否则这将是一个非常严重的问题。
编辑2:我已经写了我自己的分组,现在的行为正如我所期望的,但我认为这是一个如此常见的用例,一定有一些基本的东西我只是不明白。
暂无答案!
目前还没有任何答案,快来回答吧!