Storm,有什么方法可以记录飞行中有多少元组?
Storm, any way to log how many tuples in flight?
作为调整的一部分,我一直在调整 maxSpoutPending
参数。但是,如果能随时知道拓扑中有多少元组就好了,这样我就可以知道这个参数对我的拓扑性能有多大影响。
我在源代码中四处寻找,但没有找到任何东西。这是我可以在 Storm UI 中找到的值吗?或者我可以覆盖某处的内容来记录这个值?
假设你的 spout 中有足够的消息,你可以强制 spout 从头开始读取,看看你可以在 10 分钟内处理多少元组。 (通过基础数学,您可以获得每秒的元组数)。
例如,使用 kafka spout,您可以执行以下操作:
SpoutConfig spoutConfig = new SpoutConfig(
// your spout config
);
spoutConfig.forceFromStart = true; // this is how you tell the spout to read from the oldest kafka offset
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
然后让拓扑 运行 运行 15 分钟,看看拓扑在最后 10 分钟内处理了多少元组。
这取决于你"how many tuples are in the topology"的意思。
- 如果你想知道有多少 spout 发出的元组还没有被完全处理,你可以简单地从 Storm UI 中获取 "Spout emitted" 和 "Spout acked" 的区别(你可以也通过
client.getTopologyInfo("topolgoyName")
获取这些值(使用 client = NimbusClient.getConfiguredClient(...)
.
- 如果您想知道拓扑中所有阶段的所有元组(即,每个 spout/bolt 的所有缓冲区中的元组),这可能会非常棘手...
TopologyInfo
可能仍然有帮助,但我不确定 if/how 来计算您想知道的值。
您说您正在寻找有关 maxTuplesPending 属性有效性的见解。
使用 Storm 提供的 KafkaSpout,(我修改了源代码以添加更多日志记录以查看发生了什么)next() 方法一直被调用(<1 毫秒)。所以我总是看到从元组被确认或失败(减少 MaxPending 计数)到新元组被发送到拓扑中(再次达到 MaxPending 计数)的相对快速的转变(<1ms)。今天的日志显示了从一个元组被确认到另一个元组被发送时的时间戳。
2015-10-16T12:20:15.162-0500 s.k.PartitionManager [INFO] PM! 6 - ack
2015-10-16T12:20:15.163-0500 s.k.PartitionManager [INFO] PM! 177 - next
2015-10-16T12:20:15.400-0500 s.k.PartitionManager [INFO] PM! 10 - ack
2015-10-16T12:20:15.401-0500 s.k.PartitionManager [INFO] PM! 178 - next
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 22 - ack
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 180 - next
2015-10-16T12:20:16.511-0500 s.k.PartitionManager [INFO] PM! 27 - ack
2015-10-16T12:20:16.512-0500 s.k.PartitionManager [INFO] PM! 182 - next
这显示了相当即时的转变。因此,对于我的用例,我的拓扑中几乎总是有 maxPending 计数的元组。
我的元组也没有得到相当快的处理(~1 秒),所以对于处理得更快的元组或不同类型的 Spouts 我不能说。
作为调整的一部分,我一直在调整 maxSpoutPending
参数。但是,如果能随时知道拓扑中有多少元组就好了,这样我就可以知道这个参数对我的拓扑性能有多大影响。
我在源代码中四处寻找,但没有找到任何东西。这是我可以在 Storm UI 中找到的值吗?或者我可以覆盖某处的内容来记录这个值?
假设你的 spout 中有足够的消息,你可以强制 spout 从头开始读取,看看你可以在 10 分钟内处理多少元组。 (通过基础数学,您可以获得每秒的元组数)。
例如,使用 kafka spout,您可以执行以下操作:
SpoutConfig spoutConfig = new SpoutConfig(
// your spout config
);
spoutConfig.forceFromStart = true; // this is how you tell the spout to read from the oldest kafka offset
KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
然后让拓扑 运行 运行 15 分钟,看看拓扑在最后 10 分钟内处理了多少元组。
这取决于你"how many tuples are in the topology"的意思。
- 如果你想知道有多少 spout 发出的元组还没有被完全处理,你可以简单地从 Storm UI 中获取 "Spout emitted" 和 "Spout acked" 的区别(你可以也通过
client.getTopologyInfo("topolgoyName")
获取这些值(使用client = NimbusClient.getConfiguredClient(...)
. - 如果您想知道拓扑中所有阶段的所有元组(即,每个 spout/bolt 的所有缓冲区中的元组),这可能会非常棘手...
TopologyInfo
可能仍然有帮助,但我不确定 if/how 来计算您想知道的值。
您说您正在寻找有关 maxTuplesPending 属性有效性的见解。
使用 Storm 提供的 KafkaSpout,(我修改了源代码以添加更多日志记录以查看发生了什么)next() 方法一直被调用(<1 毫秒)。所以我总是看到从元组被确认或失败(减少 MaxPending 计数)到新元组被发送到拓扑中(再次达到 MaxPending 计数)的相对快速的转变(<1ms)。今天的日志显示了从一个元组被确认到另一个元组被发送时的时间戳。
2015-10-16T12:20:15.162-0500 s.k.PartitionManager [INFO] PM! 6 - ack
2015-10-16T12:20:15.163-0500 s.k.PartitionManager [INFO] PM! 177 - next
2015-10-16T12:20:15.400-0500 s.k.PartitionManager [INFO] PM! 10 - ack
2015-10-16T12:20:15.401-0500 s.k.PartitionManager [INFO] PM! 178 - next
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 22 - ack
2015-10-16T12:20:15.649-0500 s.k.PartitionManager [INFO] PM! 180 - next
2015-10-16T12:20:16.511-0500 s.k.PartitionManager [INFO] PM! 27 - ack
2015-10-16T12:20:16.512-0500 s.k.PartitionManager [INFO] PM! 182 - next
这显示了相当即时的转变。因此,对于我的用例,我的拓扑中几乎总是有 maxPending 计数的元组。
我的元组也没有得到相当快的处理(~1 秒),所以对于处理得更快的元组或不同类型的 Spouts 我不能说。