我想把我的数据输出到 kafka
排队,所以我写 topology
具体如下:
LinearDRPCTopologyBuilder builder = new LinearDRPCTopologyBuilder("xx");
builder.addBolt(new JdEntryParseBolt(config), 2);
builder.addBolt(new JdCrawlerBolt(config), 2).shuffleGrouping();
builder.addBolt(new JdParseBolt(config), 2).shuffleGrouping();
builder.addBolt(new KafkaBolt<String, Integer>(), 2).shuffleGrouping();
Config conf = new Config();
Map<String, String> map = new HashMap<>();
map.put("metadata.broker.list", "127.0.0.1:9092");
map.put("serializer.class", "kafka.serializer.StringEncoder");
conf.put("topic", "jdQueryCrawler");
conf.setMaxTaskParallelism(10);
LocalDRPC drpc = new LocalDRPC();
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("xx-drpc", conf, builder.createLocalTopology(drpc));
当我提交拓扑时,它返回:
java.lang.RuntimeException: Must declare exactly one stream from last bolt in LinearDRPCTopology
at backtype.storm.drpc.LinearDRPCTopologyBuilder.createTopology(LinearDRPCTopologyBuilder.java:152) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.drpc.LinearDRPCTopologyBuilder.createLocalTopology(LinearDRPCTopologyBuilder.java:87) ~[storm-core-0.9.3.jar:0.9.3]
at com.ipin.jd.process.topology.TestCrawlerDrpc.main(TestCrawlerDrpc.java:58) ~[test-classes/:na]
“lineardrpc中最后一个螺栓的确切一个流”是什么意思?我不添加多个流。
暂无答案!
目前还没有任何答案,快来回答吧!