Apache Storm spout 停止从 spout 发出消息
Apache Storm spout stops emitting messages from spout
我们已经为这个问题苦苦挣扎了很长时间。简而言之,我们的风暴拓扑在一段时间后停止以随机方式从喷口发出消息。我们有一个自动脚本,在主数据刷新 activity 完成后,每天在 06:00 UTC 重新部署拓扑。
在过去 2 周内,我们的拓扑在 UTC 晚些时候(22:00 和 02:00 之间)停止发送消息 3 次。它只有在我们重新启动它时才联机,大约是 06:00 UTC。
我搜索了很多答案和博客,但找不到这里发生的事情。我们有一个未锚定的拓扑,这是我们在 3-4 年前做出的选择。我们从 0.9.2 开始,现在是 1.1.0。
我检查了所有类型的日志,我 100% 确定控制器的 nextTuple()
方法没有被调用,并且系统中没有发生可能导致此问题的异常。我还检查了我们积累的所有类型的日志,甚至没有一个 ERROR 或 WARN 日志来解释突然停止。 INFO 日志也没有太大帮助。在 worker 日志或 supervisor 日志或 nimbus 日志中没有任何与此问题相关的内容。
这是我们的 spout class 的样子:
Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
/**
* It is to open the spout
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
mongoIndexingHandler.createMongoIndexes();
}
/**
* It executes the next tuple
*/
@Override
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
}
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
msgList.remove(0);
_collector.emit(new Values(message));
}
}
else{
Utils.sleep(1000);
}
} catch (Exception e) {
_collector.reportError(e);
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
}
}
/**
* It acknowledges the messages
*/
@Override
public void ack(Object id) {
}
/**
* It tells failed messages
*/
@Override
public void fail(Object id) {
}
/**
* It declares the message name
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
这是拓扑 class:DiagnosticTopology.java
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(wSize);
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}
我们为生产和测试环境准备了相当不错的服务器(Xeon、8 核、32 GB 和闪存驱动器),并且没有外部因素会导致此问题,因为异常处理在整个过程中无处不在代码。
当这件事发生的时候,好像一切都突然停止了,没有任何痕迹。
非常感谢任何帮助!
我不知道是什么导致了您的问题,但我建议您首先检查升级到最新的 Storm 版本是否可以解决问题。我知道至少有两个与工作线程死亡和不再恢复相关的问题 https://issues.apache.org/jira/browse/STORM-1750 https://issues.apache.org/jira/browse/STORM-2194。 1750是1.1.0固定的,2194是1.1.1才固定的
如果升级不能为您解决问题,您可以通过执行以下操作来调试它。
下次您的拓扑挂起时,打开 Storm UI 并找到您的 spout。它将显示喷出的执行者列表 运行,以及负责 运行 它们的工作人员。选择 spout executor 没有发射任何东西的工人之一。在运行那个worker的机器上打开一个shell,找到worker的JVM进程id。您可以使用 jps -m
轻松完成此操作。
示例输出显示我本地机器上端口为 6701 的 worker JVM,其 pid 为 7592:
7592 Worker test-2-1520361882 d24dc55d-76c7-4cc6-93fa-2663fcdcb1ba-10.0.75.1 6701 f7b6f8e4-6c87-47ca-a7b7-655009b6c62a
通过 kill -3 <pid>
触发线程转储,或者如果您愿意,可以使用 jstack <pid>
。
在线程转储中,您应该能够找到挂起的执行程序线程。例如,当我为一个带有名为 "word" 的 spout 的拓扑执行线程转储时,其中一个 spout 执行程序的编号为 13,我看到
编辑:堆栈溢出不会让我 post 堆栈跟踪,因为寻找未格式化代码的启发式方法很糟糕。我尝试 post 堆栈跟踪的时间可能与编写原始答案的时间一样长,所以我懒得继续尝试了。本来应该在这里的痕迹https://pastebin.com/2Sz5kkQ1
这显示了执行者 13 当前正在做什么。在这种情况下,它在调用 nextTuple 期间处于休眠状态。
如果你能找出挂起的执行器在做什么,你应该能更好地解决问题,或者向 Storm 报告错误。
我们已经在我们的应用程序中观察到这一点,我们的应用程序非常繁忙 CPU,所有其他线程都在等待轮到它们。当我们尝试使用 JVisualVM 查找根本原因以检查资源使用情况时,我们发现某些螺栓中的某些功能导致大量开销和 CPU 时间。请通过检查。如果 nextTuple() 方法的 CPU 关键路径中有阻塞的线程,或者您是否从上游接收到相同的任何数据,则任何分析工具。
我们已经为这个问题苦苦挣扎了很长时间。简而言之,我们的风暴拓扑在一段时间后停止以随机方式从喷口发出消息。我们有一个自动脚本,在主数据刷新 activity 完成后,每天在 06:00 UTC 重新部署拓扑。
在过去 2 周内,我们的拓扑在 UTC 晚些时候(22:00 和 02:00 之间)停止发送消息 3 次。它只有在我们重新启动它时才联机,大约是 06:00 UTC。
我搜索了很多答案和博客,但找不到这里发生的事情。我们有一个未锚定的拓扑,这是我们在 3-4 年前做出的选择。我们从 0.9.2 开始,现在是 1.1.0。
我检查了所有类型的日志,我 100% 确定控制器的 nextTuple()
方法没有被调用,并且系统中没有发生可能导致此问题的异常。我还检查了我们积累的所有类型的日志,甚至没有一个 ERROR 或 WARN 日志来解释突然停止。 INFO 日志也没有太大帮助。在 worker 日志或 supervisor 日志或 nimbus 日志中没有任何与此问题相关的内容。
这是我们的 spout class 的样子: Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
/**
* It is to open the spout
*/
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
mongoIndexingHandler.createMongoIndexes();
}
/**
* It executes the next tuple
*/
@Override
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
}
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
msgList.remove(0);
_collector.emit(new Values(message));
}
}
else{
Utils.sleep(1000);
}
} catch (Exception e) {
_collector.reportError(e);
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
}
}
/**
* It acknowledges the messages
*/
@Override
public void ack(Object id) {
}
/**
* It tells failed messages
*/
@Override
public void fail(Object id) {
}
/**
* It declares the message name
*/
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
}
@Override
public void activate() {
}
@Override
public void close() {
}
@Override
public void deactivate() {
}
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
}
这是拓扑 class:DiagnosticTopology.java
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
conf.setDebug(false);
conf.setNumWorkers(wSize);
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
}
}
我们为生产和测试环境准备了相当不错的服务器(Xeon、8 核、32 GB 和闪存驱动器),并且没有外部因素会导致此问题,因为异常处理在整个过程中无处不在代码。
当这件事发生的时候,好像一切都突然停止了,没有任何痕迹。
非常感谢任何帮助!
我不知道是什么导致了您的问题,但我建议您首先检查升级到最新的 Storm 版本是否可以解决问题。我知道至少有两个与工作线程死亡和不再恢复相关的问题 https://issues.apache.org/jira/browse/STORM-1750 https://issues.apache.org/jira/browse/STORM-2194。 1750是1.1.0固定的,2194是1.1.1才固定的
如果升级不能为您解决问题,您可以通过执行以下操作来调试它。
下次您的拓扑挂起时,打开 Storm UI 并找到您的 spout。它将显示喷出的执行者列表 运行,以及负责 运行 它们的工作人员。选择 spout executor 没有发射任何东西的工人之一。在运行那个worker的机器上打开一个shell,找到worker的JVM进程id。您可以使用 jps -m
轻松完成此操作。
示例输出显示我本地机器上端口为 6701 的 worker JVM,其 pid 为 7592:
7592 Worker test-2-1520361882 d24dc55d-76c7-4cc6-93fa-2663fcdcb1ba-10.0.75.1 6701 f7b6f8e4-6c87-47ca-a7b7-655009b6c62a
通过 kill -3 <pid>
触发线程转储,或者如果您愿意,可以使用 jstack <pid>
。
在线程转储中,您应该能够找到挂起的执行程序线程。例如,当我为一个带有名为 "word" 的 spout 的拓扑执行线程转储时,其中一个 spout 执行程序的编号为 13,我看到
编辑:堆栈溢出不会让我 post 堆栈跟踪,因为寻找未格式化代码的启发式方法很糟糕。我尝试 post 堆栈跟踪的时间可能与编写原始答案的时间一样长,所以我懒得继续尝试了。本来应该在这里的痕迹https://pastebin.com/2Sz5kkQ1
这显示了执行者 13 当前正在做什么。在这种情况下,它在调用 nextTuple 期间处于休眠状态。
如果你能找出挂起的执行器在做什么,你应该能更好地解决问题,或者向 Storm 报告错误。
我们已经在我们的应用程序中观察到这一点,我们的应用程序非常繁忙 CPU,所有其他线程都在等待轮到它们。当我们尝试使用 JVisualVM 查找根本原因以检查资源使用情况时,我们发现某些螺栓中的某些功能导致大量开销和 CPU 时间。请通过检查。如果 nextTuple() 方法的 CPU 关键路径中有阻塞的线程,或者您是否从上游接收到相同的任何数据,则任何分析工具。