我正在与apache storm一起使用以下拓扑:
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT));
builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout");
builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone");
第一个bolt的方法是(filteone),此类扩展了baserichbolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("ID1","signal1"));
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)*2;
System.out.println("Filter one.."+Integer.toString(sig));
collector.emit("ack1", new Values(input.getString(0), sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
第二个bolt的方法是(filtetwo),这个类也扩展了baserichbolt:
public void declareOutputFields(OutputFieldsDeclarer declarer) {
}
public void execute(Tuple input) {
int sig;
try {
sig=input.getInteger(1)+1;
System.out.println("Filter two.."+Integer.toString(sig));
collector.ack(input);
} catch (Exception e) {
collector.fail(input);
}
}
当执行程序模式localcluster时,我可以看到第一个螺栓发出元组,但第二个螺栓从未收到元组。。。。。。
2条答案
按热度按时间pgx2nnw81#
通过修改一个代码的过滤器,问题就解决了
collector.emit("ack1", new Values(input.getString(0), sig));
至collector.emit( new Values(input.getString(0), sig));
dzhpxtsq2#
方法的收集器可以设置为:
别忘了在declareoutputfields方法中设置此值的字段名:
然后,在第二个bolt中,尝试使用“myvalue”字段获取值: