invalidtopologyexception异常(msg:component:[x]从不存在的流订阅[y]

ffx8fchx  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(388)

我正在尝试读取Kafka的数据,并使用storm将其插入cassandra。我也配置了拓扑结构,但是我遇到了一些问题,我不知道为什么会这样。
这是我提交的文章。

TopologyBuilder topologyBuilder = new TopologyBuilder();
        topologyBuilder.setSpout("spout", new KafkaSpout(spoutConfig));
        topologyBuilder.setBolt("checkingbolt", new CheckingBolt("cassandraBoltStream")).shuffleGrouping("spout");
        topologyBuilder.setBolt("cassandrabolt", new CassandraInsertBolt()).shuffleGrouping("checkingbolt");

在这里,如果我评论最后一行,我没有看到任何例外。在最后一行中,我得到以下错误: InvalidTopologyException(msg:Component: [cassandrabolt] subscribes from non-existent stream: [default] of component [checkingbolt]) 有人能帮帮我吗,这里怎么了?
这是checkingbolt中的outputfielddeclarer

public void declareOutputFields(OutputFieldsDeclarer ofd) {
    ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"}));
}

我没有cassandrainsertbolt的declareoutputfields方法中的任何内容,因为该螺栓不发出任何值。
短暂性脑缺血发作

9w11ddsr

9w11ddsr1#

这里的问题是,您混合了上游名称和组件(即喷口/螺栓)名称。组件名称用于引用不同的螺栓,而流名称用于引用来自同一螺栓的不同流。例如,如果您有一个名为“evenorodbolt”的bolt,它可能会发出两个流,“偶数”流和“奇数”流。但在许多情况下,只有一个流从一个bolt中出来,这就是为什么storm有一些使用默认流名称的方便方法。
当你这么做的时候 .shuffleGrouping("checkingbolt") ,您正在使用这些方便的方法之一,实际上是说“我希望此螺栓使用来自 checkingbolt ". 如果要显式地命名流,可以使用此方法的重载版本,但只有当有多个流来自同一个螺栓时,此方法才有用。
当你这么做的时候 ofd.declareStream(cassandraBoltStream, new Fields(new String[]{"jsonFields"})); ,您是说螺栓将在名为“cassandraboltstream”的流上发射。这可能不是您想要做的,您想要声明它将在默认流上发出。你可以使用 ofd.declare 方法。
有关详细信息,请参阅文档。

相关问题