为什么 Trident 在这个最小示例中不调用 ack() 或 fail()?
Why does Trident not call ack() or fail() in this minimal example?
我尝试在 Trident 中创建一个小示例。目标是查看元组在失败情况下是如何重播的。下面是拓扑定义
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple -> {
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1)) {
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
}
return new Values(tridentTuple.toArray());
})
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
我使用来自 storm-starter 的 RandomIntegerSpout
扩展 BaseRichSpout
并且只生成 运行dom 数字。然后我应用一个 MapFunction
,它每 50 个元组绘制一个 运行dom 数字,并且 运行domly 使元组失败。
问题是,我没有收到任何 ack
或 fail
。
我在调试模式下尝试了 spout 和 运行 它,尝试了相同的示例输出,并用标准风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟调用。
我在 v1.2.3 和 v2.0.0 中用 LocalCluster 和 StormSubmitter 重现了这个问题。
下面是风暴截图UI:
与映射对应的螺栓按预期确认并使元组失败,但这永远不会传播回喷口。
我认为 trident mastercoord 可能期望某种状态的持久性以实现拓扑已完成,但是用一些 persistentAggregate 替换 peek 没有帮助。我还通过对 each
执行相同操作排除了 map
中的错误。
通过检查看到代码几乎是微不足道的,我可能误解了有关 Trident / Storm 的一些基本知识。如果批处理完成,我是否期望 trident 调用 spout 和 ack
方法是错误的?我意识到 IBatchSpout
中没有 fail
方法。 Trident 如何处理批次重放?
Trident spout 不会在单个元组级别确认或失败元组。相反,元组被批量确认。
三叉戟喷嘴通常看起来像 this interface。
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
想法是 Trident 将设法跟踪批处理元组的 acks/fails,然后如果批处理失败,它会要求 spout 重复批处理,如果没有,它就赢了't。
请注意这与标准的 Storm spout 有何不同。对于普通的 spout,框架基本上告诉 spout "Hey, emit something. Up to you what you emit.",然后 ack
和 fail
方法用于告诉 spout 是否应该再次发出特定的元组。
使用 Trident,spout 被告知 "Hey, (re)emit batch number x",然后由 spout 知道该批次中有哪些元组。使用此模型,不需要 fail
方法。不过,一些 Trident spouts 将有一个 ack/succeed
方法,以允许 spout 删除它可能与特定正在进行的批处理相关的任何状态。
对于包裹IRichSpouts
,有一些bridging code将它们包裹在三叉戟API中。基本上,包装器调用 nextTuple
直到它有一个完整的批次,然后它将 ids 存储在缓存中。如果包装器被要求重新发送一批,它会在 spout 上调用 fail
。否则,它会在批处理成功后调用 ack
。
我认为您在 Storm UI 中没有看到与此相关的任何内容的原因是 IRichBolt
实际上并未在那里表示。相反,它被包装了,所以 ack/fail
调用在 spout-spout
组件内发生 "under the hood"。如果您想确定 ack/fail 是否被调用,请尝试向 IRichSpout
.
的 ack/fail
方法添加一些日志记录
我尝试在 Trident 中创建一个小示例。目标是查看元组在失败情况下是如何重播的。下面是拓扑定义
Random rand = new Random();
Config config = new Config();
config.setDebug(true);
config.setNumWorkers(1);
TridentTopology topology = new TridentTopology();
topology.newStream("spout", new RandomIntegerSpout())
.map((MapFunction) tridentTuple -> {
if ((tridentTuple.getLongByField("msgid") % 50 == 0) &&
(rand.nextInt(2) == 1)) {
System.out.println(String.format("Failed to process tuple %d", tridentTuple.getLongByField("msgid")));
throw new ReportedFailedException("Divisible by 50");
}
return new Values(tridentTuple.toArray());
})
.peek((Consumer) tridentTuple -> System.out.println(tridentTuple.getValues()));
我使用来自 storm-starter 的 RandomIntegerSpout
扩展 BaseRichSpout
并且只生成 运行dom 数字。然后我应用一个 MapFunction
,它每 50 个元组绘制一个 运行dom 数字,并且 运行domly 使元组失败。
问题是,我没有收到任何 ack
或 fail
。
我在调试模式下尝试了 spout 和 运行 它,尝试了相同的示例输出,并用标准风暴螺栓进行了尝试。锚定工作正常,只是没有被三叉戟调用。
我在 v1.2.3 和 v2.0.0 中用 LocalCluster 和 StormSubmitter 重现了这个问题。
下面是风暴截图UI:
我认为 trident mastercoord 可能期望某种状态的持久性以实现拓扑已完成,但是用一些 persistentAggregate 替换 peek 没有帮助。我还通过对 each
执行相同操作排除了 map
中的错误。
通过检查看到代码几乎是微不足道的,我可能误解了有关 Trident / Storm 的一些基本知识。如果批处理完成,我是否期望 trident 调用 spout 和 ack
方法是错误的?我意识到 IBatchSpout
中没有 fail
方法。 Trident 如何处理批次重放?
Trident spout 不会在单个元组级别确认或失败元组。相反,元组被批量确认。
三叉戟喷嘴通常看起来像 this interface。
M emitPartitionBatch(TransactionAttempt tx, TridentCollector collector, PartitionT partition, M lastPartitionMeta);
想法是 Trident 将设法跟踪批处理元组的 acks/fails,然后如果批处理失败,它会要求 spout 重复批处理,如果没有,它就赢了't。
请注意这与标准的 Storm spout 有何不同。对于普通的 spout,框架基本上告诉 spout "Hey, emit something. Up to you what you emit.",然后 ack
和 fail
方法用于告诉 spout 是否应该再次发出特定的元组。
使用 Trident,spout 被告知 "Hey, (re)emit batch number x",然后由 spout 知道该批次中有哪些元组。使用此模型,不需要 fail
方法。不过,一些 Trident spouts 将有一个 ack/succeed
方法,以允许 spout 删除它可能与特定正在进行的批处理相关的任何状态。
对于包裹IRichSpouts
,有一些bridging code将它们包裹在三叉戟API中。基本上,包装器调用 nextTuple
直到它有一个完整的批次,然后它将 ids 存储在缓存中。如果包装器被要求重新发送一批,它会在 spout 上调用 fail
。否则,它会在批处理成功后调用 ack
。
我认为您在 Storm UI 中没有看到与此相关的任何内容的原因是 IRichBolt
实际上并未在那里表示。相反,它被包装了,所以 ack/fail
调用在 spout-spout
组件内发生 "under the hood"。如果您想确定 ack/fail 是否被调用,请尝试向 IRichSpout
.
ack/fail
方法添加一些日志记录