在尊重 maxSpoutPending 的同时关闭 KafkaSpout 中的 acking
Turn off acking in KafkaSpout while respecting maxSpoutPending
我使用的是 storm 0.9.3。我正在尝试为我的拓扑关闭每个元组的确认。我将 Config.TOPOLOGY_ACKER_EXECUTORS 设置为 0,将 maxSpoutPending 设置为 500。当我 运行 我的拓扑结构时,我注意到 maxSpoutPending 被忽略并且 spout 继续发射超过该限制。这是我的配置 -
config.setNumWorkers(3);
config.setMaxSpoutPending(500);
config.put("topology.sleep.spout.wait.strategy.time.ms", 50);
config.put("topology.message.timeout.secs", 300);
config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
我正在使用 KafkaSpout 从 Kafka 读取消息,并使用一个螺栓来使用消息。
通过将 TOPOLOGY_ACKER_EXECUTORS
设置为 0,storm 将在所有元组离开 spout 时立即确认所有元组,这可能不可靠,因为没有任何机制可以检查元组是否已处理或失败。
并通过设置 setMaxSpoutPending
告诉 storm 在 spout 上等待处理的最大元组数。 MaxSpoutPending 不会限制您的输出。如果您想查看风暴拓扑的实际输出频率,请在 Storm UI.
中为您的 运行 风暴拓扑检查 topology latency
我使用的是 storm 0.9.3。我正在尝试为我的拓扑关闭每个元组的确认。我将 Config.TOPOLOGY_ACKER_EXECUTORS 设置为 0,将 maxSpoutPending 设置为 500。当我 运行 我的拓扑结构时,我注意到 maxSpoutPending 被忽略并且 spout 继续发射超过该限制。这是我的配置 -
config.setNumWorkers(3);
config.setMaxSpoutPending(500);
config.put("topology.sleep.spout.wait.strategy.time.ms", 50);
config.put("topology.message.timeout.secs", 300);
config.put(Config.TOPOLOGY_ACKER_EXECUTORS, 0);
我正在使用 KafkaSpout 从 Kafka 读取消息,并使用一个螺栓来使用消息。
通过将 TOPOLOGY_ACKER_EXECUTORS
设置为 0,storm 将在所有元组离开 spout 时立即确认所有元组,这可能不可靠,因为没有任何机制可以检查元组是否已处理或失败。
并通过设置 setMaxSpoutPending
告诉 storm 在 spout 上等待处理的最大元组数。 MaxSpoutPending 不会限制您的输出。如果您想查看风暴拓扑的实际输出频率,请在 Storm UI.
topology latency