apachestorm(java):bolt不接收来自其他bolt的元组

bq9c1y66  于 2021-06-24  发布在  Storm
关注(0)|答案(2)|浏览(374)

我正在与apache storm一起使用以下拓扑:

TopologyBuilder builder = new TopologyBuilder();   
    builder.setSpout("socketspout", new SocketSpout(IP_HOST,PORT));
    builder.setBolt("filterone", new FilterOne()).shuffleGrouping("socketspout");
    builder.setBolt("filtertwo", new FilterTwo()).shuffleGrouping("filterone");

第一个bolt的方法是(filteone),此类扩展了baserichbolt:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields("ID1","signal1"));
}

 public void execute(Tuple input) {
    int sig;
    try {
        sig=input.getInteger(1)*2;
        System.out.println("Filter one.."+Integer.toString(sig));
        collector.emit("ack1", new Values(input.getString(0), sig));
        collector.ack(input);
    } catch (Exception e) {
        collector.fail(input);
    }
}

第二个bolt的方法是(filtetwo),这个类也扩展了baserichbolt:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
}

 public void execute(Tuple input) {
    int sig;
    try {
        sig=input.getInteger(1)+1;
        System.out.println("Filter two.."+Integer.toString(sig));
        collector.ack(input);
    } catch (Exception e) {
        collector.fail(input);
    }
}

当执行程序模式localcluster时,我可以看到第一个螺栓发出元组,但第二个螺栓从未收到元组。。。。。。

pgx2nnw8

pgx2nnw81#

通过修改一个代码的过滤器,问题就解决了 collector.emit("ack1", new Values(input.getString(0), sig));collector.emit( new Values(input.getString(0), sig));

dzhpxtsq

dzhpxtsq2#

方法的收集器可以设置为:

collector.emit(input, new Values(input.getString(0), sig));

别忘了在declareoutputfields方法中设置此值的字段名:

public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("myValue"));
    }

然后,在第二个bolt中,尝试使用“myvalue”字段获取值:

sig = input.getValueByField("myValue").getInteger(1)+1;

相关问题