Storm Spout 错误 backtype.storm.util - 异步循环终止
Storm Spout ERROR backtype.storm.util - Async loop died
我的数据来自一个通过 websocket 连接到我的风暴集群的传感器,所以每当一个数据点到达我的 websocket 服务器时,我都会将它添加到 ConcurrentLinkedQueue。我没有关于数据点频率的先验信息 "production".
我的 spout 获取此队列上的数据点并发出相应的 Tupple。 Everythink 在一段时间内运行良好(我会说大约 1000 个数据点)但随后出现以下错误:
76245 [Thread-23-incDp] ERROR backtype.storm.util - Async loop died!
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76246 [Thread-23-incDp] ERROR backtype.storm.daemon.executor -
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76481 [Thread-23-incDp] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
这是我的 spout 代码:
public class MySpout extends BaseRichSpout
{
private static final long serialVersionUID = 1L;
private AtomicLong messageIdCounter = new AtomicLong();
private static Queue<String> incData;
private SpoutOutputCollector collector;
public MySpout()
{
incData = new ConcurrentLinkedQueue<String>();
}
@Override
public void nextTuple()
{
if(incData.isEmpty())
{
Utils.sleep(500);
}
else
{
String[] splittedMsg = incData.poll().split(" ; ");
JSONParser jsonParser = new JSONParser();
try
{
int ts = Integer.parseInt(splittedMsg[0]);
JSONObject json = (JSONObject) jsonParser.parse(splittedMsg[1]);
collector.emit(new Values( ts, new DataPoint(ts, new ArrayList(json.values()))), messageIdCounter.incrementAndGet() );
}
catch (ParseException e)
{
System.err.println("Wrong input format: should be json");
e.printStackTrace();
}
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time", "dps" ));
}
public static void addElmtToQueue(String json)
{
incData.add(json);
}
我想这是因为队列出了点问题,要么不再是数据点,要么是我的两个线程之间存在并发问题(顺便说一下,我知道让它静态化可能不好,但我还没有找到另一个解决方案,因为我需要在我的服务器线程上访问它...)。
有人遇到过同样的问题吗?欢迎任何 solutions/comment :)
提前致谢。
MySpout.java 的第 56 行抛出 NullPointerException。
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
我敢打赌这行是问题所在:
String[] splittedMsg = incData.poll().split(" ; ");
由于 that poll() will return null when your queue is empty.
NullPointerException 导致 spout 崩溃。我建议两件事:
- 检查poll()的结果是否为null,如果为null则休眠
- 将你的 spout 包裹在 try/catch 中以防止它破坏你的拓扑结构
示例:
@Override
public void nextTuple() {
try {
String message = _queue.poll();
if (message == null) {
// didn't get a message, sleep for a little bit
Utils.sleep(50);
} else {
// do stuff with message
}
} catch (Exception e) {
_collector.reportError(e);
LOG.error("Spout error {}", e);
}
}
我的数据来自一个通过 websocket 连接到我的风暴集群的传感器,所以每当一个数据点到达我的 websocket 服务器时,我都会将它添加到 ConcurrentLinkedQueue。我没有关于数据点频率的先验信息 "production".
我的 spout 获取此队列上的数据点并发出相应的 Tupple。 Everythink 在一段时间内运行良好(我会说大约 1000 个数据点)但随后出现以下错误:
76245 [Thread-23-incDp] ERROR backtype.storm.util - Async loop died!
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76246 [Thread-23-incDp] ERROR backtype.storm.daemon.executor -
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
at backtype.storm.daemon.executor$fn__3373$fn__3388$fn__3417.invoke(executor.clj:565) ~[storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463) ~[storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
76481 [Thread-23-incDp] ERROR backtype.storm.util - Halting process: ("Worker died")
java.lang.RuntimeException: ("Worker died")
at backtype.storm.util$exit_process_BANG_.doInvoke(util.clj:325) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.RestFn.invoke(RestFn.java:423) [clojure-1.5.1.jar:na]
at backtype.storm.daemon.worker$fn__3808$fn__3809.invoke(worker.clj:452) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.daemon.executor$mk_executor_data$fn__3274$fn__3275.invoke(executor.clj:240) [storm-core-0.9.3.jar:0.9.3]
at backtype.storm.util$async_loop$fn__464.invoke(util.clj:473) [storm-core-0.9.3.jar:0.9.3]
at clojure.lang.AFn.run(AFn.java:24) [clojure-1.5.1.jar:na]
at java.lang.Thread.run(Unknown Source) [na:1.7.0_67]
这是我的 spout 代码:
public class MySpout extends BaseRichSpout
{
private static final long serialVersionUID = 1L;
private AtomicLong messageIdCounter = new AtomicLong();
private static Queue<String> incData;
private SpoutOutputCollector collector;
public MySpout()
{
incData = new ConcurrentLinkedQueue<String>();
}
@Override
public void nextTuple()
{
if(incData.isEmpty())
{
Utils.sleep(500);
}
else
{
String[] splittedMsg = incData.poll().split(" ; ");
JSONParser jsonParser = new JSONParser();
try
{
int ts = Integer.parseInt(splittedMsg[0]);
JSONObject json = (JSONObject) jsonParser.parse(splittedMsg[1]);
collector.emit(new Values( ts, new DataPoint(ts, new ArrayList(json.values()))), messageIdCounter.incrementAndGet() );
}
catch (ParseException e)
{
System.err.println("Wrong input format: should be json");
e.printStackTrace();
}
}
}
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
this.collector = collector;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("time", "dps" ));
}
public static void addElmtToQueue(String json)
{
incData.add(json);
}
我想这是因为队列出了点问题,要么不再是数据点,要么是我的两个线程之间存在并发问题(顺便说一下,我知道让它静态化可能不好,但我还没有找到另一个解决方案,因为我需要在我的服务器线程上访问它...)。
有人遇到过同样的问题吗?欢迎任何 solutions/comment :)
提前致谢。
MySpout.java 的第 56 行抛出 NullPointerException。
java.lang.NullPointerException: null
at streams.storm.spout.MySpout.nextTuple(MySpout.java:56) ~[bin/:na]
我敢打赌这行是问题所在:
String[] splittedMsg = incData.poll().split(" ; ");
由于 that poll() will return null when your queue is empty.
NullPointerException 导致 spout 崩溃。我建议两件事:
- 检查poll()的结果是否为null,如果为null则休眠
- 将你的 spout 包裹在 try/catch 中以防止它破坏你的拓扑结构
示例:
@Override
public void nextTuple() {
try {
String message = _queue.poll();
if (message == null) {
// didn't get a message, sleep for a little bit
Utils.sleep(50);
} else {
// do stuff with message
}
} catch (Exception e) {
_collector.reportError(e);
LOG.error("Spout error {}", e);
}
}