strom必须在LineardRptology中声明一个流形式last bolt?

bkkx9g8r  于 2021-06-08  发布在  Kafka
关注(0)|答案(0)|浏览(260)

我想把我的数据输出到 kafka 排队,所以我写 topology 具体如下:

LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("xx");
builder.addBolt(new JdEntryParseBolt(config), 2);
builder.addBolt(new JdCrawlerBolt(config), 2).shuffleGrouping();
builder.addBolt(new JdParseBolt(config), 2).shuffleGrouping();
builder.addBolt(new KafkaBolt<String, Integer>(), 2).shuffleGrouping();

Config conf = new Config();
Map<String, String> map = new HashMap<>();
map.put("metadata.broker.list", "127.0.0.1:9092");
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("topic", "jdQueryCrawler");
conf.setMaxTaskParallelism(10);

LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("xx-drpc", conf, builder.createLocalTopology(drpc));

当我提交拓扑时,它返回:

java.lang.RuntimeException: Must declare exactly one stream from last bolt in LinearDRPCTopology
            at backtype.storm.drpc.LinearDRPCTopologyBuilder.createTopology(LinearDRPCTopologyBuilder.java:152) ~[storm-core-0.9.3.jar:0.9.3]
            at backtype.storm.drpc.LinearDRPCTopologyBuilder.createLocalTopology(LinearDRPCTopologyBuilder.java:87) ~[storm-core-0.9.3.jar:0.9.3]
            at com.ipin.jd.process.topology.TestCrawlerDrpc.main(TestCrawlerDrpc.java:58) ~[test-classes/:na]

“lineardrpc中最后一个螺栓的确切一个流”是什么意思?我不添加多个流。

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题