为什么trident在这个最小的示例中不调用ack()或fail()?

6uxekuva  于 2021-06-24  发布在  Storm
关注(0)|答案(1)|浏览(301)

我试着用三叉戟做一个小例子。我们的目标是观察元组在失败时是如何重放的。下面是拓扑定义

Random rand = new Random();

        Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);

        TridentTopology topology = new TridentTopology();

        topology.newStream("spout", new RandomIntegerSpout())
                .map((MapFunction) tridentTuple -> {
                    if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
                            (rand.nextInt(2) == 1)) {
                        System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
                        throw new ReportedFailedException("Divisible by 50");
                    }
                    return new Values(tridentTuple.toArray());
                })
                .peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));

我使用 RandomIntegerSpout 从风暴开始 BaseRichSpout 产生随机数。然后我申请 MapFunction 它只是每50个元组抽取一个随机数,然后随机地使元组失败。
问题是,我没有得到任何 ack s或 fail s。
我玩了一下喷口,在调试模式下运行它,尝试了相同的示例输出,用标准的风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟呼叫。
我在1.2.3版和2.0.0版中用localcluster和stormsubmitter重现了这个问题。
下面是storm ui的截图:

与map-ack和tuple对应的螺栓如预期的那样失败,但是这永远不会传播回喷口。
我原以为三叉戟大师会期望某种状态下的持久性来实现拓扑完成,但用某种持久聚合替换peek并没有帮助。我还排除了 map 用同样的方法 each .
查看代码几乎是微不足道的检查,我可能误解了一些基本的三叉戟/风暴。我以为三叉戟会叫喷口的 ack 方法是否完成了批处理?我意识到没有 fail 中的方法 IBatchSpout . 三叉戟如何处理批量回放??

rwqw0loc

rwqw0loc1#

三叉戟喷口不会在单个元组级别确认或失败元组。相反,元组是作为一个批进行打包的。
三叉戟喷口通常看起来像这样的接口。

M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);

其思想是trident将管理跟踪批处理元组的acks/fails,然后如果批处理失败,它将要求喷口重复该批处理,如果没有,它根本不会。
请注意这与标准风暴喷口有何不同。对于一个普通的喷口,框架基本上告诉喷口“嘿,发射一些东西。由你决定你发射什么“,然后 ack 以及 fail 方法用于告诉喷口是否应该再次发出特定的元组。
使用trident时,喷口会被告知“嘿,(re)emit batch number x”,然后由喷口知道该批中有哪些元组。有了这种模式,就不需要 fail 方法。一些三叉戟喷口会有一个 ack/succeed 方法,以允许喷口下降与特定进行中的批处理相关的任何状态。
用于 Package IRichSpouts ,有一些桥接代码将它们 Package 到trident api中。基本上, Package 器调用 nextTuple 直到它有一个完整的批处理,然后它将id存储在缓存中。如果 Package 器被要求重新提交一个批,它将调用 fail 在喷口上。否则,它会调用 ack 一旦批处理成功。
我认为你在storm ui中没有看到与此相关的任何内容的原因是 IRichBolt 实际上并没有在那里表现出来。相反,它是包起来的,所以 ack/fail 电话是发生在“引擎盖下”的内部 spout-spout 组件。如果您想确定是否正在调用ack/fail,请尝试向 ack/fail 你的方法 IRichSpout .

相关问题