为什么 Trident 在这个最小示例中不调用 ack() 或 fail()?

Why does Trident not call ack() or fail() in this minimal example?

我尝试在 Trident 中创建一个小示例。目标是查看元组在失败情况下是如何重播的。下面是拓扑定义

        Random rand = new Random();

        Config config = new Config();
        config.setDebug(true);
        config.setNumWorkers(1);

        TridentTopology topology = new TridentTopology();

        topology.newStream("spout", new RandomIntegerSpout())
                .map((MapFunction) tridentTuple -> {
                    if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
                            (rand.nextInt(2) == 1)) {
                        System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
                        throw new ReportedFailedException("Divisible by 50");
                    }
                    return new Values(tridentTuple.toArray());
                })
                .peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));

我使用来自 storm-starter 的 RandomIntegerSpout 扩展 BaseRichSpout 并且只生成 运行dom 数字。然后我应用一个 MapFunction,它每 50 个元组绘制一个 运行dom 数字,并且 运行domly 使元组失败。

问题是,我没有收到任何 ackfail

我在调试模式下尝试了 spout 和 运行 它,尝试了相同的示例输出,并用标准风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟调用。

我在 v1.2.3 和 v2.0.0 中用 LocalCluster 和 StormSubmitter 重现了这个问题。

下面是风暴截图UI: 与映射对应的螺栓按预期确认并使元组失败,但这永远不会传播回喷口。

我认为 trident mastercoord 可能期望某种状态的持久性以实现拓扑已完成,但是用一些 persistentAggregate 替换 peek 没有帮助。我还通过对 each 执行相同操作排除了 map 中的错误。

通过检查看到代码几乎是微不足道的,我可能误解了有关 Trident / Storm 的一些基本知识。如果批处理完成,我是否期望 trident 调用 spout 和 ack 方法是错误的?我意识到 IBatchSpout 中没有 fail 方法。 Trident 如何处理批次重放?

Trident spout 不会在单个元组级别确认或失败元组。相反,元组被批量确认。

三叉戟喷嘴通常看起来像 this interface

M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);

想法是 Trident 将设法跟踪批处理元组的 acks/fails,然后如果批处理失败,它会要求 spout 重复批处理,如果没有,它就赢了't。

请注意这与标准的 Storm spout 有何不同。对于普通的 spout,框架基本上告诉 spout "Hey, emit something. Up to you what you emit.",然后 ackfail 方法用于告诉 spout 是否应该再次发出特定的元组。

使用 Trident,spout 被告知 "Hey, (re)emit batch number x",然后由 spout 知道该批次中有哪些元组。使用此模型,不需要 fail 方法。不过,一些 Trident spouts 将有一个 ack/succeed 方法,以允许 spout 删除它可能与特定正在进行的批处理相关的任何状态。

对于包裹IRichSpouts,有一些bridging code将它们包裹在三叉戟API中。基本上,包装器调用 nextTuple 直到它有一个完整的批次,然后它将 ids 存储在缓存中。如果包装器被要求重新发送一批,它会在 spout 上调用 fail。否则,它会在批处理成功后调用 ack

我认为您在 Storm UI 中没有看到与此相关的任何内容的原因是 IRichBolt 实际上并未在那里表示。相反,它被包装了,所以 ack/fail 调用在 spout-spout 组件内发生 "under the hood"。如果您想确定 ack/fail 是否被调用,请尝试向 IRichSpout.

ack/fail 方法添加一些日志记录