Apache Storm Kafka Spout 延迟问题
Apache Storm Kafka Spout Lag Issue
我正在使用 Storm 1.1.2 和 Kafka 0.11 构建一个 Java Spring 应用程序,以便在 Docker 容器中启动。
我的拓扑中的一切都按计划工作,但在 Kafka 的高负载下,Kafka 延迟随着时间的推移越来越多。
我的 KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
那么我的拓扑如下
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
RouterBolt(扩展了 BaseRichBolt)执行一个非常简单的 switch 语句,然后使用本地 KafkaProducer 对象将新消息发送到另一个主题。就像我说的,一切都编译并且拓扑按预期运行但在高负载(3000 messages/s)下,Kafka 滞后堆积起来等同于拓扑的低吞吐量。
我试过用
禁用acking
conf.setNumAckers(0);
和
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
但我想这不是一个问题。
我在 Storm UI 上看到 RouterBolt 在高负载下的执行延迟为 1.2 毫秒,处理延迟为 .03 毫秒,这让我相信 Spout 是 bottleneck.Also并行度提示为 25,因为 "myTopic" 有 25 个分区。谢谢!
您可能会受到 https://issues.apache.org/jira/browse/STORM-3102 的影响,这会导致 spout 在每次发射时执行相当昂贵的调用。请尝试升级到其中一个固定版本。
编辑:修复程序尚未实际发布。您可能仍想通过使用例如从源代码构建 spout 来尝试修复。 https://github.com/apache/storm/tree/1.1.x-branch 构建 1.1.4 快照。
我正在使用 Storm 1.1.2 和 Kafka 0.11 构建一个 Java Spring 应用程序,以便在 Docker 容器中启动。
我的拓扑中的一切都按计划工作,但在 Kafka 的高负载下,Kafka 延迟随着时间的推移越来越多。
我的 KafkaSpoutConfig:
KafkaSpoutConfig<String,String> spoutConf =
KafkaSpoutConfig.builder("kafkaContainerName:9092", "myTopic")
.setProp(ConsumerConfig.GROUP_ID_CONFIG, "myGroup")
.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyObjectDeserializer.class)
.build()
那么我的拓扑如下
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("stormKafkaSpout", new KafkaSpout<String,String>(spoutConf), 25);
builder.setBolt("routerBolt", new RouterBolt(),25).shuffleGrouping("stormKafkaSpout");
Config conf = new Config();
conf.setNumWorkers(10);
conf.put(Config.STORM_ZOOKEEPER_SERVERS, ImmutableList.of("zookeeper"));
conf.put(Config.STORM_ZOOKEEPER_PORT, 2181);
conf.put(Config.NIMBUS_SEEDS, ImmutableList.of("nimbus"));
conf.put(Config.NIMBUS_THRIFT_PORT, 6627);
System.setProperty("storm.jar", "/opt/storm.jar");
StormSubmitter.submitTopology("topologyId", conf, builder.createTopology());
RouterBolt(扩展了 BaseRichBolt)执行一个非常简单的 switch 语句,然后使用本地 KafkaProducer 对象将新消息发送到另一个主题。就像我说的,一切都编译并且拓扑按预期运行但在高负载(3000 messages/s)下,Kafka 滞后堆积起来等同于拓扑的低吞吐量。
我试过用
禁用ackingconf.setNumAckers(0);
和
conf.put(Config.TOPOLGY_ACKER_EXECUTORS, 0);
但我想这不是一个问题。
我在 Storm UI 上看到 RouterBolt 在高负载下的执行延迟为 1.2 毫秒,处理延迟为 .03 毫秒,这让我相信 Spout 是 bottleneck.Also并行度提示为 25,因为 "myTopic" 有 25 个分区。谢谢!
您可能会受到 https://issues.apache.org/jira/browse/STORM-3102 的影响,这会导致 spout 在每次发射时执行相当昂贵的调用。请尝试升级到其中一个固定版本。
编辑:修复程序尚未实际发布。您可能仍想通过使用例如从源代码构建 spout 来尝试修复。 https://github.com/apache/storm/tree/1.1.x-branch 构建 1.1.4 快照。