设置螺栓以读取trident拓扑中其他螺栓的特定流

kuarbcqp  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(221)

我试图写一个三叉戟拓扑,其中有多个螺栓。现在,我想将一个bolt注册到其他bolt特定流,如下所示。

TridentTopologyBuilder tridentTopologyBuilder = new TridentTopologyBuilder();
FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
                                            new Values("the cow jumped over the moon"),
                                            new Values("the man went to the store and bought some candy"),
                                            new Values("four score and seven years ago"),
                                            new Values("how many apples can you eat"));

tridentTopologyBuilder.setSpout("tridentSpout", "spoutStream", "spoutId", spout, 2, "spoutBatch");
Map<String, String> batchGroups = new HashMap<>();
batchGroups.put("boltStream", "boltBatch");
tridentTopologyBuilder.setBolt("tridentBolt", new TridentTestBolt(), 1, Sets.newHashSet("spoutBatch"), batchGroups).shuffleGrouping("tridentSpout", "spoutStream");

tridentTopologyBuilder.setBolt("tridentBolt2", new TridentTestBolt2(), 1, new HashSet<>(), batchGroups).shuffleGrouping("tridentBolt", "boltStream");

LocalCluster cluster = new LocalCluster();
Config config = new Config();
config.setDebug(true);
cluster.submitTopology("TridentTopology", config, tridentTopologyBuilder.buildTopology(new HashMap<>()));

我得到以下例外:

Error: InvalidTopologyException(msg:Component: [tridentBolt2] subscribes from non-existent stream: [$coord-boltBatch] of component [tridentBolt])

还使用outputfieldsdeclarer的declarestream方法声明流

@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declareStream("boltStream", new Fields("sentence"));
}

另外,注册到其他特定流也可以在正常拓扑中工作。问题是三叉戟的拓扑结构。还有,我们应该把它当作什么 batchGroups .

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题