ShellBolt - 在 ack/fail 之后锚定到“”

ShellBolt - Anchored onto "" after ack/fail

我的 shellbolt 有问题,它使用带有 multilang 模块的 cpp bolt。这一个死了,因为 shellBolt 的属性“_inputs”为空并调用下一个运行时异常:"Anchored onto #Anchor after ack/fail" .

这是错误:

ERROR backtype.storm.task.ShellBolt - Halting process: ShellBolt died. java.lang.RuntimeException: Anchored onto -2767654327142912901 after ack/fail at backtype.storm.task.ShellBolt.handleEmit(ShellBolt.java:198) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.task.ShellBolt.access00(ShellBolt.java:69) ~[storm-core-0.9.4.jar:0.9.4] at backtype.storm.task.ShellBolt$BoltReaderRunnable.run(ShellBolt.java:335) ~[storm-core-0.9.4.jar:0.9.4] at java.lang.Thread.run(Thread.java:745) [na:1.7.0_67] 19630 [Thread-31-__system] INFO backtype.storm.util - Async loop interrupted!

日志:http://pastebin.com/pG9mHt8X

这是我的 Cpp ShellBolt:

public class AppCppStorm {
public static class CppStorm extends ShellBolt implements IRichBolt {

    public CppStorm()
    {
        super("/home/cloudera/IdeaProjects/using_storm/src/main/cpp/test");
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(new Fields("word"));
    }

    @Override
    public Map<String, Object> getComponentConfiguration() {
        return null;
    }
}

public static void main(String[] args) throws Exception {

        TopologyBuilder builder = new TopologyBuilder();

        builder.setSpout("spout", new RandomSentenceSpout(), 5);

        builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout");
        builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));

        Config conf = new Config();
        conf.setDebug(true);


        if (args != null && args.length > 0) {
            conf.setNumWorkers(3);

            StormSubmitter.submitTopology(args[0], conf, builder.createTopology());
        }
        else {
            conf.setMaxTaskParallelism(3);
            conf.setNumWorkers(3);

            LocalCluster cluster = new LocalCluster();
            cluster.submitTopology("word-count", conf, builder.createTopology());

            Thread.sleep(10000);

            cluster.shutdown();
        }
    }
}

我使用的是 storm 0.9.4,我认为我的错误与下一个报告的错误相同:https://issues.apache.org/jira/browse/STORM-531

如果有人能帮助我,我将不胜感激。

我解决了我的错误,从0.9.3 storm版本开始,当我们收到心跳元组时,我们必须调用bolt synchronize方法。 例如:

class Tuple
{
    private:
        // ...
        int _id;
        std::string _stream;
    public:
        // ...
        bool is_heartbeat_tuple()
        {
            if (this->_id == -1 && this->_stream.compare("__heartbeat") == 0)
                return true;
            return false;
        }
}
class Bolt
{
    public:
        // ...
        void Run()
        {
            Mode = BOLT;
            std::pair<Json::Value, Json::Value> conf_context = InitComponent();
            Initialize(conf_context.first, conf_context.second);
            while(1)
            {
                Tuple tuple = ReadTuple();
                Anchor_tuple = &tuple;
                if (tuple.is_heartbeat_tuple())
                    sync();
                else
                {
                    Process(tuple);
                    Ack(tuple.GetID());
                }
            }
        }
}