java—为什么ApacheStorm中combineraggregator/reduceraggregator的元组大小总是0

bvjxkvbb  于 2021-06-24  发布在  Storm
关注(0)|答案(0)|浏览(150)

我最近开始使用storm trident,在阅读storm文档时,遇到了一个很难理解的问题,下面是我的代码

// TestGlobalAggregatorTopo
public class TestGlobalAggregatorTopo {
    public static StormTopology buildTopology() {
        TridentTopology topology = new TridentTopology();

        FixedBatchSpout spout = new FixedBatchSpout(new Fields("sentence"), 2,
                                                    new Values("the cow jumped over the moon"),
                                                    new Values("the man went to the store and bought some candy"),
                                                    new Values("four score and seven years ago"),
                                                    new Values("how many apples candy you eat"));

        spout.setCycle(false);
        topology.newStream("spout", spout).parallelismHint(1)
                .each(new Fields("sentence"), new SplitFunction(), new Fields("word"))
//                .each(new Fields("sentence"), new OutputFunction(), new Fields(""))
                .groupBy(new Fields("word"))
                .persistentAggregate(new MemoryMapState.Factory(), new CountCombinerAggregator(), new Fields("count"))
                .newValuesStream()
                .peek(new Consumer() {
                    @Override
                    public void accept(TridentTuple input) {
                        String v = String.format("word = [%s], count = [%s]", input.get(0), input.get(1));
                        System.out.println(v);
                    }
                });

        return topology.build();
    }

    public static void main(String[] args) {
        Config config = new Config();
        config.setDebug(false);
        LocalCluster localCluster = new LocalCluster();
        localCluster.submitTopology("aggregator", config, buildTopology());
    }
}
// CountCombinerAggregator
public class CountCombinerAggregator implements CombinerAggregator<PersistentCountState> {

    private static final long serialVersionUID = -8503208039935179201L;

    private static final Logger LOG = LoggerFactory.getLogger(CountCombinerAggregator.class);

    @Override
    public PersistentCountState init(TridentTuple tuple) {
        LOG.info("tuple.size = [{}]", tuple.size());
        return new PersistentCountState(1);
    }

    @Override
    public PersistentCountState combine(PersistentCountState val1, PersistentCountState val2) {
        int cnt = val1.getCount() + val2.getCount();
        PersistentCountState state = new PersistentCountState(1);
        state.setCount(cnt);
        return state;
    }

    @Override
    public PersistentCountState zero() {
        return new PersistentCountState(0);
    }
}

代码很简单,fixedbatchspoutt发出tuple和splitfunction将句子拆分成单词,然后countcombineraggerator做单词计数。
但是,我在combineraggregator中遇到了一个问题,即 TridentTuple tuple 在ʻinit方法始终为零。 如trident api概述中所述: combineraggregator返回一个元组,其中一个字段作为输出。combineraggerators对每个输入元组运行init函数,并使用combine函数组合值,直到只剩下一个值。如果分区中没有元组,combineraggerator将发出zero函数的输出。例如,下面是count的实现: 所以我的问题是:为什么TridentTuple tuple在ʻinitmethod始终为零?

暂无答案!

目前还没有任何答案,快来回答吧!

相关问题