如何查看 运行 Storm 拓扑的当前输出?
How can I see the current output of a running Storm topology?
目前正在学习如何使用 Storm(版本 2.1.0),我对这个数据流处理 (DSP) 引擎的特定方面有点困惑:如何处理输出数据? 教程对系统设置和 运行我们的第一个应用程序提供了很好的解释。不幸的是,我没有找到提供有关拓扑生成的结果的详细信息的页面。
对于 DSP 应用程序,没有最终输出,因为输入数据是连续传入的数据流(或者我们可以说当应用程序停止时有最终输出) .我想要的是能够看到 运行ning 拓扑的当前输出状态(当前生成的实际输出数据)。
我可以 运行 WordCountTopology。我知道此拓扑的输出是由以下代码片段生成的:
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
我的误解是 <"word":string, "count":int>
输出的位置。是不是只在内存里,写在某个地方的数据库里,写在文件里?
进一步探讨这个问题:存储正在进行的输出数据的现有可能性有哪些?处理此类数据的“好方法”是什么?
希望我的问题不要太幼稚。也感谢 Whosebug 社区一直以来提供的良好帮助。
自从我发布这个问题以来已经过去了几天。我回来与您分享我的尝试。虽然我不知道这样做是否正确,但以下两个命题回答了我的问题。
简单 System.out.println()
我尝试的第一件事是直接在 BaseBasicBolt 的 prepare()
方法中创建一个 System.out.println("Hello World!")
。此方法仅在每个 Bolt 的线程执行开始时调用一次。
public void prepare(Map topoConf, TopologyContext context) {
System.out.println("Hello World!");
}
最大的挑战是找出日志的写入位置。默认情况下,它写在 <storm installation folder>/logs/workers-artifacts/<topology name>/<worker-port>/worker.log
中,其中 <worker-port>
是请求的 worker/slot.
的端口
例如,对于 conf.setNumWorkers(3)
,拓扑请求访问 3 个工作者(3 个槽)。因此,<worker-port>
的值将是 6700、6701 和 6702。这些值是 3 个插槽的端口号(在 supervisor.slots.ports
下的 storm.yaml
中定义)。
注意:您将拥有与 BaseBasicBolt 的平行尺寸一样多的“Hello World!”。当 split bolt 用 builder.setBolt("split", new SplitSentence(), 8)
实例化时,它会产生 8 个并行线程,每个线程都写入自己的日志。
写入文件
出于研究目的,我必须以特定格式分析我需要的大量日志。我找到的解决方案是将日志附加到每个螺栓管理的特定文件中。
以下是我自己为 count 个螺栓实现的文件记录解决方案。
public static class WordCount extends BaseBasicBolt {
private String workerName;
private FileWriter fw;
private BufferedWriter bw;
private PrintWriter out;
private String logFile = "/var/log/storm/count.log";
private Map<String, Integer> counts = new HashMap<String, Integer>();
public void prepare(Map topoConf, TopologyContext context) {
this.workerName = this.toString();
try {
this.fw = new FileWriter(logFile, true);
this.bw = new BufferedWriter(fw);
this.out = new PrintWriter(bw);
} catch (Exception e) {
System.out.println(e);
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
out.println(this.workerName + ": Hello World!");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
在此代码中,我的日志文件位于 /var/log/storm/count.log
中,调用 out.println(text)
会在此文件的末尾附加 text
。由于我不确定它是否是线程安全的,所有并行线程同时写入同一个文件可能会导致数据丢失。
注意:如果你的螺栓分布在多台机器上,每台机器都会有自己的日志文件。在我的测试中,我配置了一个简单的集群,只有 1 台机器(运行 Nimbus + Supervisor + UI),因此我只有 1 个日志文件。
结论
有多种方法可以处理输出数据,更常见的是使用 Storm 记录任何内容。我没有找到任何官方的方式来做这件事,而且关于这个主题的文档也很简单。
虽然我们中的一些人会对简单的 sysout.println()
感到满意,但其他人可能需要将大量数据推送到特定文件或专用数据库引擎中。使用 Java 可以做的任何事情都可以用 Storm 实现,因为它很简单 Java 编程。
对于完成此答案的任何建议和补充评论,我们将不胜感激。
目前正在学习如何使用 Storm(版本 2.1.0),我对这个数据流处理 (DSP) 引擎的特定方面有点困惑:如何处理输出数据? 教程对系统设置和 运行我们的第一个应用程序提供了很好的解释。不幸的是,我没有找到提供有关拓扑生成的结果的详细信息的页面。
对于 DSP 应用程序,没有最终输出,因为输入数据是连续传入的数据流(或者我们可以说当应用程序停止时有最终输出) .我想要的是能够看到 运行ning 拓扑的当前输出状态(当前生成的实际输出数据)。
我可以 运行 WordCountTopology。我知道此拓扑的输出是由以下代码片段生成的:
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
我的误解是 <"word":string, "count":int>
输出的位置。是不是只在内存里,写在某个地方的数据库里,写在文件里?
进一步探讨这个问题:存储正在进行的输出数据的现有可能性有哪些?处理此类数据的“好方法”是什么?
希望我的问题不要太幼稚。也感谢 Whosebug 社区一直以来提供的良好帮助。
自从我发布这个问题以来已经过去了几天。我回来与您分享我的尝试。虽然我不知道这样做是否正确,但以下两个命题回答了我的问题。
简单 System.out.println()
我尝试的第一件事是直接在 BaseBasicBolt 的 prepare()
方法中创建一个 System.out.println("Hello World!")
。此方法仅在每个 Bolt 的线程执行开始时调用一次。
public void prepare(Map topoConf, TopologyContext context) {
System.out.println("Hello World!");
}
最大的挑战是找出日志的写入位置。默认情况下,它写在 <storm installation folder>/logs/workers-artifacts/<topology name>/<worker-port>/worker.log
中,其中 <worker-port>
是请求的 worker/slot.
例如,对于 conf.setNumWorkers(3)
,拓扑请求访问 3 个工作者(3 个槽)。因此,<worker-port>
的值将是 6700、6701 和 6702。这些值是 3 个插槽的端口号(在 supervisor.slots.ports
下的 storm.yaml
中定义)。
注意:您将拥有与 BaseBasicBolt 的平行尺寸一样多的“Hello World!”。当 split bolt 用 builder.setBolt("split", new SplitSentence(), 8)
实例化时,它会产生 8 个并行线程,每个线程都写入自己的日志。
写入文件
出于研究目的,我必须以特定格式分析我需要的大量日志。我找到的解决方案是将日志附加到每个螺栓管理的特定文件中。
以下是我自己为 count 个螺栓实现的文件记录解决方案。
public static class WordCount extends BaseBasicBolt {
private String workerName;
private FileWriter fw;
private BufferedWriter bw;
private PrintWriter out;
private String logFile = "/var/log/storm/count.log";
private Map<String, Integer> counts = new HashMap<String, Integer>();
public void prepare(Map topoConf, TopologyContext context) {
this.workerName = this.toString();
try {
this.fw = new FileWriter(logFile, true);
this.bw = new BufferedWriter(fw);
this.out = new PrintWriter(bw);
} catch (Exception e) {
System.out.println(e);
}
}
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null) {
count = 0;
}
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
out.println(this.workerName + ": Hello World!");
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
}
在此代码中,我的日志文件位于 /var/log/storm/count.log
中,调用 out.println(text)
会在此文件的末尾附加 text
。由于我不确定它是否是线程安全的,所有并行线程同时写入同一个文件可能会导致数据丢失。
注意:如果你的螺栓分布在多台机器上,每台机器都会有自己的日志文件。在我的测试中,我配置了一个简单的集群,只有 1 台机器(运行 Nimbus + Supervisor + UI),因此我只有 1 个日志文件。
结论
有多种方法可以处理输出数据,更常见的是使用 Storm 记录任何内容。我没有找到任何官方的方式来做这件事,而且关于这个主题的文档也很简单。
虽然我们中的一些人会对简单的 sysout.println()
感到满意,但其他人可能需要将大量数据推送到特定文件或专用数据库引擎中。使用 Java 可以做的任何事情都可以用 Storm 实现,因为它很简单 Java 编程。
对于完成此答案的任何建议和补充评论,我们将不胜感激。