ShellBolt - 在 ack/fail 之后锚定到“”
ShellBolt - Anchored onto "" after ack/fail
我的 shellbolt
有问题,它使用带有 multilang 模块的 cpp bolt。这一个死了,因为 shellBolt 的属性“_inputs”为空并调用下一个运行时异常:"Anchored onto #Anchor after ack/fail" .
这是错误:
ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died.
java.lang.RuntimeException: Anchored onto -2767654327142912901 after ack/fail
at backtype.storm.task.ShellBolt.handleEmit(ShellBolt.java:198) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.task.ShellBolt.access00(ShellBolt.java:69) ~[storm-core-0.9.4.jar:0.9.4]
at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:335) ~[storm-core-0.9.4.jar:0.9.4]
at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67]
19630 [Thread-31-__system] INFO backtype.storm.util - Async loop interrupted!
日志:http://pastebin.com/pG9mHt8X
这是我的 Cpp ShellBolt:
public class AppCppStorm {
public static class CppStorm extends ShellBolt implements IRichBolt {
public CppStorm()
{
super("/home/cloudera/IdeaProjects/using_storm/src/main/cpp/test");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
conf.setNumWorkers(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
我使用的是 storm 0.9.4,我认为我的错误与下一个报告的错误相同:https://issues.apache.org/jira/browse/STORM-531
如果有人能帮助我,我将不胜感激。
我解决了我的错误,从0.9.3 storm版本开始,当我们收到心跳元组时,我们必须调用bolt synchronize方法。
例如:
class Tuple
{
private:
// ...
int _id;
std::string _stream;
public:
// ...
bool is_heartbeat_tuple()
{
if (this->_id == -1 && this->_stream.compare("__heartbeat") == 0)
return true;
return false;
}
}
class Bolt
{
public:
// ...
void Run()
{
Mode = BOLT;
std::pair<Json::Value, Json::Value> conf_context = InitComponent();
Initialize(conf_context.first, conf_context.second);
while(1)
{
Tuple tuple = ReadTuple();
Anchor_tuple = &tuple;
if (tuple.is_heartbeat_tuple())
sync();
else
{
Process(tuple);
Ack(tuple.GetID());
}
}
}
}
我的 shellbolt
有问题,它使用带有 multilang 模块的 cpp bolt。这一个死了,因为 shellBolt 的属性“_inputs”为空并调用下一个运行时异常:"Anchored onto #Anchor after ack/fail" .
这是错误:
ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died. java.lang.RuntimeException: Anchored onto -2767654327142912901 after ack/fail at backtype.storm.task.ShellBolt.handleEmit(ShellBolt.java:198) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.task.ShellBolt.access00(ShellBolt.java:69) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:335) ~[storm-core-0.9.4.jar:0.9.4] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67] 19630 [Thread-31-__system] INFO backtype.storm.util - Async loop interrupted!
日志:http://pastebin.com/pG9mHt8X
这是我的 Cpp ShellBolt:
public class AppCppStorm {
public static class CppStorm extends ShellBolt implements IRichBolt {
public CppStorm()
{
super("/home/cloudera/IdeaProjects/using_storm/src/main/cpp/test");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
conf.setNumWorkers(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
}
我使用的是 storm 0.9.4,我认为我的错误与下一个报告的错误相同:https://issues.apache.org/jira/browse/STORM-531
如果有人能帮助我,我将不胜感激。
我解决了我的错误,从0.9.3 storm版本开始,当我们收到心跳元组时,我们必须调用bolt synchronize方法。 例如:
class Tuple
{
private:
// ...
int _id;
std::string _stream;
public:
// ...
bool is_heartbeat_tuple()
{
if (this->_id == -1 && this->_stream.compare("__heartbeat") == 0)
return true;
return false;
}
}
class Bolt
{
public:
// ...
void Run()
{
Mode = BOLT;
std::pair<Json::Value, Json::Value> conf_context = InitComponent();
Initialize(conf_context.first, conf_context.second);
while(1)
{
Tuple tuple = ReadTuple();
Anchor_tuple = &tuple;
if (tuple.is_heartbeat_tuple())
sync();
else
{
Process(tuple);
Ack(tuple.GetID());
}
}
}
}