部署 Storm 构建 JAR
Deploying Storm build JAR
我开发了一个Javaclass从Kafka队列中读取数据并打印出来
ZkHosts zkHosts=new ZkHosts("localhost:2181");
String topic_name="test";
String consumer_group_id="storm";
String zookeeper_root="";
SpoutConfig kafkaConfig=new SpoutConfig(zkHosts,
topic_name, zookeeper_root, consumer_group_id);
kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme());
/*kafkaConfig.forceFromStart=false;
kafkaConfig.startOffsetTime =-2;*/
KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig);
TopologyBuilder builder=new TopologyBuilder();
//builder.setSpout("KafkaSpout", kafkaSpout, 1);
builder.setSpout("KafkaSpout", kafkaSpout);
builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout");
Map<String, Object> conf = new HashMap<String, Object>();
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181);
conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost"));
conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000);
conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000);
conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3);
conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30);
LocalCluster cluster=new LocalCluster();
try{
cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology());
Thread.sleep(120000);
}catch (Exception e) {
//throw new IllegalStateException("Couldn't initialize the topology", e);
System.out.println(e.getMessage());
}
编码后,我将 Maven 构建为 JAR 文件并将 jar 移动到 Amazon AWS 集群
- 然后 运行 命令如
nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App
但是我在这里遇到了一个错误,谁能告诉我我在部署时犯了什么错误?。我在考虑以下我必须做的事情:
- 我需要在 strom config 文件夹中部署这个 jar 文件,我需要吗?但我确实将 jar 放在 AWS 的单独文件夹中(不在 storm 文件夹中)
- 如何查看系统输出
- 我需要在我的项目中包含任何 yml 文件吗?
请查找以下例外情况:
29537 [Thread-14-KafkaSpout] ERROR backtype.storm.util - Async loop died!
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.utils.Logging$class.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.logger$lzycompute(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.utils.Logging$class.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [uber-iot-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting WhosebugError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
... 22 common frames omitted
@Matthias J. Sax 和大家,感谢您的帮助。
我在这里犯的错误是,我遵循的部署过程是错误的。
要部署拓扑构建,我必须遵循以下过程,
- 必须将 Jar 推入 storm AWS 文件夹,然后必须 运行 下面的命令才能使其被 Storm 识别
rm -f *.out
(nohup bin/storm 灵气 > nimubus.out)&
(nohup bin/storm 主管 > supervisor.out)&
(nohup bin/storm jar topos/IoT.jar com.bridgera.iot.test.App01 > IoT.out)&
这里我告诉风暴它在哪里可以找到我的 jar 和主要 class 从哪里可以找到拓扑构建器...
谢谢大家...
我开发了一个Javaclass从Kafka队列中读取数据并打印出来
ZkHosts zkHosts=new ZkHosts("localhost:2181"); String topic_name="test"; String consumer_group_id="storm"; String zookeeper_root=""; SpoutConfig kafkaConfig=new SpoutConfig(zkHosts, topic_name, zookeeper_root, consumer_group_id); kafkaConfig.scheme=new SchemeAsMultiScheme(new StringScheme()); /*kafkaConfig.forceFromStart=false; kafkaConfig.startOffsetTime =-2;*/ KafkaSpout kafkaSpout = new KafkaSpout(kafkaConfig); TopologyBuilder builder=new TopologyBuilder(); //builder.setSpout("KafkaSpout", kafkaSpout, 1); builder.setSpout("KafkaSpout", kafkaSpout); builder.setBolt("PrinterBolt", new PrinterBolt()).globalGrouping("KafkaSpout"); Map<String, Object> conf = new HashMap<String, Object>(); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, 2181); conf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, Arrays.asList("localhost")); conf.put(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT, 20000); conf.put(Config.STORM_ZOOKEEPER_RETRY_TIMES, 3); conf.put(Config.STORM_ZOOKEEPER_RETRY_INTERVAL, 30); LocalCluster cluster=new LocalCluster(); try{ cluster.submitTopology("KafkaConsumerTopology", conf, builder.createTopology()); Thread.sleep(120000); }catch (Exception e) { //throw new IllegalStateException("Couldn't initialize the topology", e); System.out.println(e.getMessage()); }
编码后,我将 Maven 构建为 JAR 文件并将 jar 移动到 Amazon AWS 集群
- 然后 运行 命令如
nohup java -cp uber-***-0.0.1-SNAPSHOT.jar com.***.&&&.kafka.App
但是我在这里遇到了一个错误,谁能告诉我我在部署时犯了什么错误?。我在考虑以下我必须做的事情:
- 我需要在 strom config 文件夹中部署这个 jar 文件,我需要吗?但我确实将 jar 放在 AWS 的单独文件夹中(不在 storm 文件夹中)
- 如何查看系统输出
- 我需要在我的项目中包含任何 yml 文件吗?
请查找以下例外情况:
29537 [Thread-14-KafkaSpout] ERROR backtype.storm.util - Async loop died!
java.lang.ExceptionInInitializerError: null
at org.apache.log4j.Logger.getLogger(Logger.java:39) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.utils.Logging$class.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.logger$lzycompute(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.logger(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.utils.Logging$class.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.debug(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.network.BlockingChannel.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.connect(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.getOrMakeConnection(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.kafka$consumer$SimpleConsumer$$sendRequest(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(Unknown Source) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:77) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaUtils.getOffset(KafkaUtils.java:67) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.PartitionManager.<init>(PartitionManager.java:83) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.ZkCoordinator.refresh(ZkCoordinator.java:98) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.ZkCoordinator.getMyManagedPartitions(ZkCoordinator.java:69) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at storm.kafka.KafkaSpout.nextTuple(KafkaSpout.java:135) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
at clojure.lang.AFn.run(AFn.java:24) [uber-iot-0.0.1-SNAPSHOT.jar:na]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
Caused by: java.lang.IllegalStateException: Detected both log4j-over-slf4j.jar AND slf4j-log4j12.jar on the class path, preempting WhosebugError. See also http://www.slf4j.org/codes.html#log4jDelegationLoop for more details.
at org.apache.log4j.Log4jLoggerFactory.<clinit>(Log4jLoggerFactory.java:49) ~[uber-iot-0.0.1-SNAPSHOT.jar:na]
... 22 common frames omitted
@Matthias J. Sax 和大家,感谢您的帮助。 我在这里犯的错误是,我遵循的部署过程是错误的。 要部署拓扑构建,我必须遵循以下过程,
- 必须将 Jar 推入 storm AWS 文件夹,然后必须 运行 下面的命令才能使其被 Storm 识别
rm -f *.out
(nohup bin/storm 灵气 > nimubus.out)&
(nohup bin/storm 主管 > supervisor.out)&
(nohup bin/storm jar topos/IoT.jar com.bridgera.iot.test.App01 > IoT.out)&
这里我告诉风暴它在哪里可以找到我的 jar 和主要 class 从哪里可以找到拓扑构建器...
谢谢大家...