Apache Storm 远程拓扑提交
Apache Storm Remote Topology Submission
我一直在测试使用 IDE (Eclipse) 远程提交 Storm 拓扑。
我成功地将简单的风暴拓扑上传到远程 Storm 集群,但奇怪的是当我检查 Storm UI 以确保远程提交的拓扑是否正常工作时,我只看到了 _acker 螺栓UI 但其他螺栓和喷嘴不存在。之后,我从命令行手动提交拓扑并再次检查 Storm UI,它正常工作,因为它应该没有问题。我一直在寻找原因,但找不到。我在下面附上了拓扑和远程提交器 class 以及相应的 Storm UI 图片:
This is the output from Eclipse console (after remote submission)
225 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar
234 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar
这是拓扑:
public class StormBenchmark {
// ******************************************************************************************
public static class GenSpout extends BaseRichSpout {
//private static final Logger logger = Logger.getLogger(StormBenchmark.class.getName());
private Long count = 1L;
private Object msgID;
private static final long serialVersionUID = 1L;
private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'};
private static final String[] newsagencies = {"bbc", "cnn", "reuters", "aljazeera", "nytimes", "nbc news", "fox news", "interfax"};
SpoutOutputCollector _collector;
int _size;
Random _rand;
String _id;
String _val;
// Constructor
public GenSpout(int size) {
_size = size;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
_id = randString(5);
_val = randString2(_size);
}
//Business logic
public void nextTuple() {
count++;
msgID = count;
_collector.emit(new Values(_id, _val), msgID);
}
public void ack(Object msgID) {
this.msgID = msgID;
}
private String randString(int size) {
StringBuffer buf = new StringBuffer();
for(int i=0; i<size; i++) {
buf.append(CHARS[_rand.nextInt(CHARS.length)]);
}
return buf.toString();
}
private String randString2(int size) {
StringBuffer buf = new StringBuffer();
for(int i=0; i<size; i++) {
buf.append(newsagencies[_rand.nextInt(newsagencies.length)]);
}
return buf.toString();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "item"));
}
}
// =======================================================================================================
// =================================== B O L T ===========================================================
public static class IdentityBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "item"));
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String character = tuple.getString(0);
String agency = tuple.getString(1);
List<String> box = new ArrayList<String>();
box.add(character);
box.add(agency);
try {
fileWriter(box);
} catch (IOException e) {
e.printStackTrace();
}
box.clear();
}
public void fileWriter(List<String> listjon) throws IOException {
String pathname = "/home/hduser/logOfStormTops/logs.txt";
File file = new File(pathname);
if (!file.exists()){
file.createNewFile();
}
BufferedWriter writer = new BufferedWriter(new FileWriter(file, true));
writer.write(listjon.get(0) + " : " + listjon.get(1));
writer.newLine();
writer.flush();
writer.close();
}
}
//storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new GenSpout(8), 2).setNumTasks(4);
builder.setBolt("bolt", new IdentityBolt(), 4).setNumTasks(8)
.shuffleGrouping("spout");
Config conf = new Config();
conf.setMaxSpoutPending(200);
conf.setStatsSampleRate(0.0001);
//topology.executor.receive.buffer.size: 8192 #batched
//topology.executor.send.buffer.size: 8192 #individual messages
//topology.transfer.buffer.size: 1024 # batched
conf.put("topology.executor.send.buffer.size", 1024);
conf.put("topology.transfer.buffer.size", 8);
conf.put("topology.receiver.buffer.size", 8);
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n");
StormSubmitter.submitTopology("SampleTop", conf, builder.createTopology());
}
}
这里是 RemoteSubmitter class:
public class RemoteSubmissionTopo {
@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
public static void main(String... args) {
Config conf = new Config();
TopologyBuilder topoBuilder = new TopologyBuilder();
conf.put(Config.NIMBUS_HOST, "117.16.142.49");
conf.setDebug(true);
Map stormConf = Utils.readStormConfig();
stormConf.put("nimbus.host", "117.16.142.49");
String jar_path = "T:\STORM_TOPOLOGIES\Benchmark.jar";
Client client = NimbusClient.getConfiguredClient(stormConf).getClient();
try {
NimbusClient nimbus = new NimbusClient(stormConf, "117.16.142.49", 6627);
String uploadedJarLocation = StormSubmitter.submitJar(stormConf, jar_path);
String jsonConf = JSONValue.toJSONString(stormConf);
nimbus.getClient().submitTopology("benchmark-tp", uploadedJarLocation, jsonConf, topoBuilder.createTopology());
} catch (TTransportException e) {
e.printStackTrace();
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这里是风暴 UI 图片(在远程提交的情况下)
这是另一张 Storm UI 图片(手动提交的情况下)
在 RemoteSubmissionTopo
中使用 TopologyBuilder topoBuilder = new TopologyBuilder();
但不调用 setSpout(...)
/setBolt(...)
。因此,您正在提交没有运算符的拓扑...
顺便说一句:实际上根本不需要 RemoteSubmissionTopo
。您可以使用 StormBenchmark
远程提交。只需在 main
中添加 conf.put(Config.NIMBUS_HOST, "117.16.142.49");
并设置 JVM 选项 -Dstorm.jar=/path/to/topology.jar
即可 运行.
我一直在测试使用 IDE (Eclipse) 远程提交 Storm 拓扑。 我成功地将简单的风暴拓扑上传到远程 Storm 集群,但奇怪的是当我检查 Storm UI 以确保远程提交的拓扑是否正常工作时,我只看到了 _acker 螺栓UI 但其他螺栓和喷嘴不存在。之后,我从命令行手动提交拓扑并再次检查 Storm UI,它正常工作,因为它应该没有问题。我一直在寻找原因,但找不到。我在下面附上了拓扑和远程提交器 class 以及相应的 Storm UI 图片:
This is the output from Eclipse console (after remote submission)
225 [main] INFO backtype.storm.StormSubmitter - Uploading topology jar T:\STORM_TOPOLOGIES\Benchmark.jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar
234 [main] INFO backtype.storm.StormSubmitter - Successfully uploaded topology jar to assigned location: /app/storm/nimbus/inbox/stormjar-d3ca2e14-c1d4-45e1-b21c-70f62c62cd84.jar
这是拓扑:
public class StormBenchmark {
// ******************************************************************************************
public static class GenSpout extends BaseRichSpout {
//private static final Logger logger = Logger.getLogger(StormBenchmark.class.getName());
private Long count = 1L;
private Object msgID;
private static final long serialVersionUID = 1L;
private static final Character[] CHARS = new Character[] { 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h'};
private static final String[] newsagencies = {"bbc", "cnn", "reuters", "aljazeera", "nytimes", "nbc news", "fox news", "interfax"};
SpoutOutputCollector _collector;
int _size;
Random _rand;
String _id;
String _val;
// Constructor
public GenSpout(int size) {
_size = size;
}
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
_rand = new Random();
_id = randString(5);
_val = randString2(_size);
}
//Business logic
public void nextTuple() {
count++;
msgID = count;
_collector.emit(new Values(_id, _val), msgID);
}
public void ack(Object msgID) {
this.msgID = msgID;
}
private String randString(int size) {
StringBuffer buf = new StringBuffer();
for(int i=0; i<size; i++) {
buf.append(CHARS[_rand.nextInt(CHARS.length)]);
}
return buf.toString();
}
private String randString2(int size) {
StringBuffer buf = new StringBuffer();
for(int i=0; i<size; i++) {
buf.append(newsagencies[_rand.nextInt(newsagencies.length)]);
}
return buf.toString();
}
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "item"));
}
}
// =======================================================================================================
// =================================== B O L T ===========================================================
public static class IdentityBolt extends BaseBasicBolt {
private static final long serialVersionUID = 1L;
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("id", "item"));
}
public void execute(Tuple tuple, BasicOutputCollector collector) {
String character = tuple.getString(0);
String agency = tuple.getString(1);
List<String> box = new ArrayList<String>();
box.add(character);
box.add(agency);
try {
fileWriter(box);
} catch (IOException e) {
e.printStackTrace();
}
box.clear();
}
public void fileWriter(List<String> listjon) throws IOException {
String pathname = "/home/hduser/logOfStormTops/logs.txt";
File file = new File(pathname);
if (!file.exists()){
file.createNewFile();
}
BufferedWriter writer = new BufferedWriter(new FileWriter(file, true));
writer.write(listjon.get(0) + " : " + listjon.get(1));
writer.newLine();
writer.flush();
writer.close();
}
}
//storm jar storm-benchmark-0.0.1-SNAPSHOT-standalone.jar storm.benchmark.ThroughputTest demo 100 8 8 8 10000
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new GenSpout(8), 2).setNumTasks(4);
builder.setBolt("bolt", new IdentityBolt(), 4).setNumTasks(8)
.shuffleGrouping("spout");
Config conf = new Config();
conf.setMaxSpoutPending(200);
conf.setStatsSampleRate(0.0001);
//topology.executor.receive.buffer.size: 8192 #batched
//topology.executor.send.buffer.size: 8192 #individual messages
//topology.transfer.buffer.size: 1024 # batched
conf.put("topology.executor.send.buffer.size", 1024);
conf.put("topology.transfer.buffer.size", 8);
conf.put("topology.receiver.buffer.size", 8);
conf.put(Config.TOPOLOGY_WORKER_CHILDOPTS, "-Xdebug -Xrunjdwp:transport=dt_socket,address=1%ID%,server=y,suspend=n");
StormSubmitter.submitTopology("SampleTop", conf, builder.createTopology());
}
}
这里是 RemoteSubmitter class:
public class RemoteSubmissionTopo {
@SuppressWarnings({ "unchecked", "rawtypes", "unused" })
public static void main(String... args) {
Config conf = new Config();
TopologyBuilder topoBuilder = new TopologyBuilder();
conf.put(Config.NIMBUS_HOST, "117.16.142.49");
conf.setDebug(true);
Map stormConf = Utils.readStormConfig();
stormConf.put("nimbus.host", "117.16.142.49");
String jar_path = "T:\STORM_TOPOLOGIES\Benchmark.jar";
Client client = NimbusClient.getConfiguredClient(stormConf).getClient();
try {
NimbusClient nimbus = new NimbusClient(stormConf, "117.16.142.49", 6627);
String uploadedJarLocation = StormSubmitter.submitJar(stormConf, jar_path);
String jsonConf = JSONValue.toJSONString(stormConf);
nimbus.getClient().submitTopology("benchmark-tp", uploadedJarLocation, jsonConf, topoBuilder.createTopology());
} catch (TTransportException e) {
e.printStackTrace();
} catch (AlreadyAliveException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (InvalidTopologyException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (TException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
try {
Thread.sleep(6000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
这里是风暴 UI 图片(在远程提交的情况下)
这是另一张 Storm UI 图片(手动提交的情况下)
在 RemoteSubmissionTopo
中使用 TopologyBuilder topoBuilder = new TopologyBuilder();
但不调用 setSpout(...)
/setBolt(...)
。因此,您正在提交没有运算符的拓扑...
顺便说一句:实际上根本不需要 RemoteSubmissionTopo
。您可以使用 StormBenchmark
远程提交。只需在 main
中添加 conf.put(Config.NIMBUS_HOST, "117.16.142.49");
并设置 JVM 选项 -Dstorm.jar=/path/to/topology.jar
即可 运行.