无法提交风暴拓扑

Unable to Submit storm topology

我正在尝试使用 Eclipse 在远程主机上提交 Storm 拓扑。

这是我的代码:

Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(1);
conf.put(Config.NIMBUS_HOST, "hostName");
conf.put(Config.NIMBUS_THRIFT_PORT,6627);
conf.put(Config.STORM_ZOOKEEPER_SERVERS,Arrays.asList(new String[]{"hostName"}));
conf.put(Config.STORM_ZOOKEEPER_PORT,2181);

// Remote submission
StormSubmitter.submitTopology("classMain", conf, topology);

但是我得到这个例外:

Exception in thread "main" java.lang.RuntimeException: org.apache.thrift7.TApplicationException: Binary field exceeded string size limit
  at backtype.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:250)
 at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:271)
  at backtype.storm.StormSubmitter.submitTopology(StormSubmitter.java:157)
  at com.rbc.rbccm.hackathon.Countersearch.submitTopology(Countersearch.java:111)
  at com.rbc.rbccm.hackathon.Countersearch.main(Countersearch.java:37)
Caused by: org.apache.thrift7.TApplicationException: Binary field exceeded string size limit
  at org.apache.thrift7.TApplicationException.read(TApplicationException.java:111)
  at org.apache.thrift7.TServiceClient.receiveBase(TServiceClient.java:71)
  at backtype.storm.generated.Nimbus$Client.recv_submitTopology(Nimbus.java:184)
  at backtype.storm.generated.Nimbus$Client.submitTopology(Nimbus.java:168)
  at backtype.storm.StormSubmitter.submitTopologyAs(StormSubmitter.java:236)
... 4 more

我们可以传递给 submitTopology 函数的参数是否有字符串大小限制?

当我进一步追踪时,会导致:

public void submitTopology(String name, String uploadedJarLocation, String jsonConf, StormTopology topology) throws AlreadyAliveException, InvalidTopologyException, AuthorizationException, org.apache.thrift.TException
{
    send_submitTopology(name, uploadedJarLocation, jsonConf, topology);
    recv_submitTopology();
}

recv 导致了这个问题。有什么想法吗?

您需要增加nimbus.thrift.max_buffer_size参数。您可以在 storm.yamlConfig 对象中设置它。

如果您在 StormSubmitter.java 处看到 Storm 源代码中的代码,是这样的:

public static void submitTopology(String name, Map stormConf, StormTopology topology)
        throws AlreadyAliveException, InvalidTopologyException, AuthorizationException {
    submitTopology(name, stormConf, topology, null, null);
}

Thrift 错误是因为您指定的 name 太长(超过 2MB?)或 stormConf 信息过多,或者更可能的原因是,当topology 已创建,您正在用太多信息填充 Spout 或 Bolt 实例。

在我的例子中,我创建了一个螺栓,我在其中初始化了太多数据。

builder.setBolt(genBolt, new GenBolt(myTable1.getHashMap(), myTable2.getHashMap(), myTable3.getHashMap(), myTable4.getHashMap()), 2)
  .fieldsGrouping(iterSpout, new Fields(con.BATCH_ID))