如何使用nodejs获取apache storm中的输出
How to fetch the output in apache storm using nodejs
我是 apache 风暴的新手。该程序是将传入的句子拆分为单词,然后进行单词计数。我在 console.log('in spout') 和 console.log(word) 中遇到问题,即获取输出。我附上了下面的代码:
var storm = require('node-storm')
var randomsentence = (function() {
var sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
console.log('before spout')
return storm.spout(function(sync) {
var self = this
setTimeout(function() {
console.log('in spout')
var i = Math.floor(Math.random()*sentences.length)
var sentence = sentences[i]
self.emit([sentence]) /* {id:'unique'} //for reliable emit */
sync()
}, 100)
}).declareOutputFields(["word"])
})()
var splitsentence = storm.basicbolt(function(data) {
var words = data.tuple[0].split(" ")
for (var i = 0; i < words.length; ++i) {
var word = words[i].trim()
if (word) {
this.emit([word])
console.log(word)
}
}
}).declareOutputFields(["word"])
var wordcount = (function() {
var counts = {}
return storm.basicbolt(function(data) {
var word = data.tuple[0]
if (counts[word] == null) {
counts[word] = 0
}
var count = ++counts[word]
this.emit([word, count])
}).declareOutputFields(["word", "count"])
})()
var builder = storm.topologybuilder()
builder.setSpout('randomsentence', randomsentence)
builder.setBolt('splitsentence', splitsentence, 8).shuffleGrouping('randomsentence')
builder.setBolt('wordcount', wordcount, 12).fieldsGrouping('splitsentence', ['word'])
//builder.setBolt('word',word,3).shuffleGrouping('wordcount')
var topology = builder.createTopology()
var options = {
// name: 'optional... the default name is the name of the topology script',
//nimbus: 'localhost:2181',
//nimbus: '127.0.0.1:2181',
nimbus: '172.26.4.227:2181',
config: { 'topology.debug': true }
}
storm.submit(topology, options, function(err, topologyName) {
// Handle error or submission success
if(err){console.log(err)}
console.log(topologyName)
})
//process.on('uncaughtException', function (err) {
// console.log(err);
//});
我认为您看不到 console.log('in spout')
和 console.log(word)
的输出的原因是风暴分布处理的方式。 spout 和 bolt 的内容将在集群中各个风暴节点上的工作进程中执行。日志将位于这些风暴节点上。
我是 apache 风暴的新手。该程序是将传入的句子拆分为单词,然后进行单词计数。我在 console.log('in spout') 和 console.log(word) 中遇到问题,即获取输出。我附上了下面的代码:
var storm = require('node-storm')
var randomsentence = (function() {
var sentences = [
"the cow jumped over the moon",
"an apple a day keeps the doctor away",
"four score and seven years ago",
"snow white and the seven dwarfs",
"i am at two with nature"
]
console.log('before spout')
return storm.spout(function(sync) {
var self = this
setTimeout(function() {
console.log('in spout')
var i = Math.floor(Math.random()*sentences.length)
var sentence = sentences[i]
self.emit([sentence]) /* {id:'unique'} //for reliable emit */
sync()
}, 100)
}).declareOutputFields(["word"])
})()
var splitsentence = storm.basicbolt(function(data) {
var words = data.tuple[0].split(" ")
for (var i = 0; i < words.length; ++i) {
var word = words[i].trim()
if (word) {
this.emit([word])
console.log(word)
}
}
}).declareOutputFields(["word"])
var wordcount = (function() {
var counts = {}
return storm.basicbolt(function(data) {
var word = data.tuple[0]
if (counts[word] == null) {
counts[word] = 0
}
var count = ++counts[word]
this.emit([word, count])
}).declareOutputFields(["word", "count"])
})()
var builder = storm.topologybuilder()
builder.setSpout('randomsentence', randomsentence)
builder.setBolt('splitsentence', splitsentence, 8).shuffleGrouping('randomsentence')
builder.setBolt('wordcount', wordcount, 12).fieldsGrouping('splitsentence', ['word'])
//builder.setBolt('word',word,3).shuffleGrouping('wordcount')
var topology = builder.createTopology()
var options = {
// name: 'optional... the default name is the name of the topology script',
//nimbus: 'localhost:2181',
//nimbus: '127.0.0.1:2181',
nimbus: '172.26.4.227:2181',
config: { 'topology.debug': true }
}
storm.submit(topology, options, function(err, topologyName) {
// Handle error or submission success
if(err){console.log(err)}
console.log(topologyName)
})
//process.on('uncaughtException', function (err) {
// console.log(err);
//});
我认为您看不到 console.log('in spout')
和 console.log(word)
的输出的原因是风暴分布处理的方式。 spout 和 bolt 的内容将在集群中各个风暴节点上的工作进程中执行。日志将位于这些风暴节点上。