Storm ShellBolt "loses" 个元组
Storm ShellBolt "loses" tuples
我的拓扑结构如下所示:
注意 Bolt2 和 Bolt3 从 Bolt1 和 Bolt4 接收元组。所有螺栓都是 ShellBolts 运行 python 脚本,spout 是一个 ShellSpout 运行 一个从 RabbitMQ 读取的 python 脚本。除 Bolt4 外,一切都按预期工作。如果我一次向 RabbitMQ 添加一条消息,它将全部正常工作并干净地完成。如果我在一条消息在 Bolt4 上待处理时将其排队,则它永远不会被 Bolt4 处理。其他螺栓仍在执行其功能,但 Bolt4 在完成第一个螺栓后将停在那里。
Storm UI显示所有元组都被bolt4执行,但只有一个被acked。 None 失败了。我正在使用 Storm 0.9.5,以及 storm-starter 中包含的 multilang python 适配器。
shellbolt 和 spout 实现只是声明输出字段,没有别的。
如果我将 TOPOLOGY_MAX_SPOUT_PENDING 设置为 1,则一切正常,但我一次只能处理一个元组,Bolt1 和 2 最终会等待 Bolt4 准备就绪。
每个螺栓每个元组需要 3-30 秒才能完成。
所以我的问题是:接下来我应该看哪里?
编辑:这是一个最小的失败案例。
bolt1.py:
import storm
import time
import json
class Bolt1(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
self.emit("bolt4Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt1().run()
bolt2.py:
import storm
import time
import json
class Bolt2(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt2().run()
bolt3.py:
import storm
import time
import json
class Bolt3(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt3().run()
bolt4.py:
import storm
import time
import json
class Bolt4(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt4().run()
spout.py:
import storm
import random
class Spout(storm.Spout):
def nextTuple(self):
storm.emit(["id1234", "{}"], id=str(random.randint(1, 10000)))
if __name__ == '__main__':
Spout().run()
拓扑:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class PyroTopology {
public static class PythonBolt extends ShellBolt implements IRichBolt {
public PythonBolt(String script) {
super("python", script);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt4 extends ShellBolt implements IRichBolt {
public Bolt4() {
super("python", "bolt4.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt1 extends ShellBolt implements IRichBolt {
public Bolt1() {
super("python", "bolt1.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt4Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class PythonSpout extends ShellSpout implements IRichSpout {
public PythonSpout() {
super("python", "spout.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbit", new PythonSpout(), 1);
builder.setBolt("bolt1", new Bolt1(), 1).
shuffleGrouping("rabbit");
builder.setBolt("bolt4", new Bolt4(), 1).
shuffleGrouping("bolt1", "bolt4Queue");
builder.setBolt("bolt3", new PythonBolt("bolt3.py"), 1).
shuffleGrouping("bolt1", "bolt3Queue").
shuffleGrouping("bolt4", "bolt3Queue");
builder.setBolt("bolt2", new PythonBolt("bolt2.py"), 1).
shuffleGrouping("bolt1", "bolt2Queue").
shuffleGrouping("bolt4", "bolt2Queue");
Config conf = new Config();
conf.setStatsSampleRate(1.0);
conf.put(Config.TOPOLOGY_DEBUG, true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
}
}
我用这个部署:
storm jar target/storm-starter-0.9.5-jar-with-dependencies.jar storm.starter.PyroTopology vb
每个组件 1 个元组后,整个系统挂起。没有新的元组被处理或失败。这是系统卡住后我的 Storm UI:
它就这样永远坐着。 (为了以防万一,我等了几个小时。)
您正在使用 BasicBolt
,它会自动为您处理确认。因此,您不得在您的代码中手动确认输入元组。这会导致单个元组出现多个 ack,这会混淆 Storm 跟踪 ack 的机制(通过 xor-ing 消息 ID 和 ack ID)。作为替代方案(如果您需要高级确认行为,您可以实施 Bolt
.
如 UI 所示,您的 spout 没有收到任何确认,因此当达到 max spout pending 时,Storm 停止发出元组。此外,您看到螺栓的 "executed" 和 "acked" 的计数不匹配——这也表明未正确处理 acks。
我的拓扑结构如下所示:
注意 Bolt2 和 Bolt3 从 Bolt1 和 Bolt4 接收元组。所有螺栓都是 ShellBolts 运行 python 脚本,spout 是一个 ShellSpout 运行 一个从 RabbitMQ 读取的 python 脚本。除 Bolt4 外,一切都按预期工作。如果我一次向 RabbitMQ 添加一条消息,它将全部正常工作并干净地完成。如果我在一条消息在 Bolt4 上待处理时将其排队,则它永远不会被 Bolt4 处理。其他螺栓仍在执行其功能,但 Bolt4 在完成第一个螺栓后将停在那里。
Storm UI显示所有元组都被bolt4执行,但只有一个被acked。 None 失败了。我正在使用 Storm 0.9.5,以及 storm-starter 中包含的 multilang python 适配器。
shellbolt 和 spout 实现只是声明输出字段,没有别的。
如果我将 TOPOLOGY_MAX_SPOUT_PENDING 设置为 1,则一切正常,但我一次只能处理一个元组,Bolt1 和 2 最终会等待 Bolt4 准备就绪。
每个螺栓每个元组需要 3-30 秒才能完成。
所以我的问题是:接下来我应该看哪里?
编辑:这是一个最小的失败案例。
bolt1.py:
import storm
import time
import json
class Bolt1(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
self.emit("bolt4Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt1().run()
bolt2.py:
import storm
import time
import json
class Bolt2(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt2().run()
bolt3.py:
import storm
import time
import json
class Bolt3(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt3().run()
bolt4.py:
import storm
import time
import json
class Bolt4(storm.BasicBolt):
# overrides storm.Bolt.process
def process(self, tup):
objID, APIArgs = tup.values
APIArgs = json.loads(APIArgs)
self.emit("bolt3Queue", objID, **APIArgs)
self.emit("bolt2Queue", objID, **APIArgs)
storm.ack(tup)
def emit(self, stream, objID, **APIArgs):
tup = [objID, json.dumps(APIArgs)]
storm.log("Emit [%s] %s" % (stream, tup))
storm.emit(tup, stream=stream)
if __name__ == '__main__':
Bolt4().run()
spout.py:
import storm
import random
class Spout(storm.Spout):
def nextTuple(self):
storm.emit(["id1234", "{}"], id=str(random.randint(1, 10000)))
if __name__ == '__main__':
Spout().run()
拓扑:
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package storm.starter;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.StormSubmitter;
import backtype.storm.spout.ShellSpout;
import backtype.storm.task.ShellBolt;
import backtype.storm.topology.*;
import backtype.storm.topology.base.BaseBasicBolt;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Tuple;
import backtype.storm.tuple.Values;
import java.util.HashMap;
import java.util.Map;
public class PyroTopology {
public static class PythonBolt extends ShellBolt implements IRichBolt {
public PythonBolt(String script) {
super("python", script);
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt4 extends ShellBolt implements IRichBolt {
public Bolt4() {
super("python", "bolt4.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class Bolt1 extends ShellBolt implements IRichBolt {
public Bolt1() {
super("python", "bolt1.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declareStream("bolt3Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt2Queue", new Fields("objID", "APIArgs"));
declarer.declareStream("bolt4Queue", new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static class PythonSpout extends ShellSpout implements IRichSpout {
public PythonSpout() {
super("python", "spout.py");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("objID", "APIArgs"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("rabbit", new PythonSpout(), 1);
builder.setBolt("bolt1", new Bolt1(), 1).
shuffleGrouping("rabbit");
builder.setBolt("bolt4", new Bolt4(), 1).
shuffleGrouping("bolt1", "bolt4Queue");
builder.setBolt("bolt3", new PythonBolt("bolt3.py"), 1).
shuffleGrouping("bolt1", "bolt3Queue").
shuffleGrouping("bolt4", "bolt3Queue");
builder.setBolt("bolt2", new PythonBolt("bolt2.py"), 1).
shuffleGrouping("bolt1", "bolt2Queue").
shuffleGrouping("bolt4", "bolt2Queue");
Config conf = new Config();
conf.setStatsSampleRate(1.0);
conf.put(Config.TOPOLOGY_DEBUG, true);
conf.put(Config.TOPOLOGY_MAX_SPOUT_PENDING, 5);
conf.put(Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS, 60);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(30000);
cluster.shutdown();
}
}
}
我用这个部署:
storm jar target/storm-starter-0.9.5-jar-with-dependencies.jar storm.starter.PyroTopology vb
每个组件 1 个元组后,整个系统挂起。没有新的元组被处理或失败。这是系统卡住后我的 Storm UI:
它就这样永远坐着。 (为了以防万一,我等了几个小时。)
您正在使用 BasicBolt
,它会自动为您处理确认。因此,您不得在您的代码中手动确认输入元组。这会导致单个元组出现多个 ack,这会混淆 Storm 跟踪 ack 的机制(通过 xor-ing 消息 ID 和 ack ID)。作为替代方案(如果您需要高级确认行为,您可以实施 Bolt
.
如 UI 所示,您的 spout 没有收到任何确认,因此当达到 max spout pending 时,Storm 停止发出元组。此外,您看到螺栓的 "executed" 和 "acked" 的计数不匹配——这也表明未正确处理 acks。