使用 Bluebird.js 和 Twitter 流的承诺和流
Promises and streams using Bluebird.js and Twitter stream
我是 Promises 和 Node 的新手,对在流中使用 promises 很好奇。我可以承诺一个流吗?
使用 Bluebirdjs 和 Twit 模块我有以下内容:
var Twit = require('twit')
var Promise = require("bluebird");
var T = new Twit({
consumer_key: process.env.CONSUMER_KEY,
consumer_secret: process.env.CONSUMER_SECRET,
access_token: process.env.ACCESS_TOKEN,
access_token_secret: process.env.ACCESS_TOKEN_SECRET
})
Promise.promisifyAll(Twit);
Promise.promisifyAll(T);
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
T.streamAsync('statuses/filter', { locations: sanFrancisco })
.then(function(connection){
connection.onAsync('tweet')
.then(function (tweet) {
console.log(tweet)
})
});
运行 此代码不会记录推文,也不会引发任何错误。似乎没有任何反应,但 .then 的 none 承诺有效。
原始片段,在尝试实现 twit docs
中的承诺之前
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
var stream = T.stream('statuses/filter', { locations: sanFrancisco })
stream.on('tweet', function (tweet) {
console.log(tweet)
})
很抱歉,这行不通,因为 promise 和流之间存在根本区别。你说你对这两个都是新手,所以让我给你简单介绍一下。
Promises 可以被视为一些可能尚未到达的单个值的占位符。例如,一些假设函数 getTweet()
可以这样工作:
getTweet()
.then(function (tweet) {
//Do something with your tweet!
console.log(tweet);
});
但这只会给你一条推文!要获得另一个,您必须再次调用 getTweet()
,然后再调用一个新的 .then()
。事实上,当使用 promises 时,你可以保证 .then()
只调用它的包含函数一次!
流是连续的数据流。您不必手动请求一条推文,然后是另一条,然后是另一条。你打开水龙头,然后它就一直流,直到它完成或你告诉它停止。
因此,总而言之,您不能对流进行承诺,因为承诺是针对单个值的,而流是针对连续数据流的。
我假设您问这个问题是因为您喜欢 promise 界面并且想对流使用类似的东西?根据您要实现的目标,有不同的库可以更好地处理流。 EventStream 就是一个例子。让我知道你的计划,我也许可以给你举个例子。
Can I promisify a stream?
没有。虽然流不断发出事件,但承诺只会解决一次。它们具有完全不同的语义(即使两者都使用异步回调)。
您 可以 为结束的流作出承诺,请参阅 EventEmitterPromisifier
example in BlueBirds documentation - 但这不是您的 Twitter 流示例所做的。
Running this code does not log a tweet and no error is thrown.
因为T.stream()
是一个同步工厂函数,returns是一个流对象。您不需要 - 您不能 - 使用 streamAsync
,因为它永远不会调用隐式传递的回调。
我最终使用了 RxJS implementing observables with streams。
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
var stream = T.stream('statuses/filter', { locations: sanFrancisco });
var source = Rx.Node.fromEvent(stream, 'tweet');
var observer = Rx.Observer.create(
function (tweet) {
// THIS IS WHERE EACH TWEET SHOULD COME FROM THE STREAM
console.log(tweet);
},
function (err) {
console.log('Error getting tweets: ' + err);
},
function () {
console.log('Completed');
}
);
source.subscribe(observer);
我最终不得不使用 RX.Observable.fromEvent 而不是 Rx.Node.fromStream 因为,Twit 模块必须在幕后处理实际流,但通过 EventEmitter 公开它,他们可能不应该命名它 T.stream.
我是 Promises 和 Node 的新手,对在流中使用 promises 很好奇。我可以承诺一个流吗? 使用 Bluebirdjs 和 Twit 模块我有以下内容:
var Twit = require('twit')
var Promise = require("bluebird");
var T = new Twit({
consumer_key: process.env.CONSUMER_KEY,
consumer_secret: process.env.CONSUMER_SECRET,
access_token: process.env.ACCESS_TOKEN,
access_token_secret: process.env.ACCESS_TOKEN_SECRET
})
Promise.promisifyAll(Twit);
Promise.promisifyAll(T);
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
T.streamAsync('statuses/filter', { locations: sanFrancisco })
.then(function(connection){
connection.onAsync('tweet')
.then(function (tweet) {
console.log(tweet)
})
});
运行 此代码不会记录推文,也不会引发任何错误。似乎没有任何反应,但 .then 的 none 承诺有效。
原始片段,在尝试实现 twit docs
中的承诺之前var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
var stream = T.stream('statuses/filter', { locations: sanFrancisco })
stream.on('tweet', function (tweet) {
console.log(tweet)
})
很抱歉,这行不通,因为 promise 和流之间存在根本区别。你说你对这两个都是新手,所以让我给你简单介绍一下。
Promises 可以被视为一些可能尚未到达的单个值的占位符。例如,一些假设函数 getTweet()
可以这样工作:
getTweet()
.then(function (tweet) {
//Do something with your tweet!
console.log(tweet);
});
但这只会给你一条推文!要获得另一个,您必须再次调用 getTweet()
,然后再调用一个新的 .then()
。事实上,当使用 promises 时,你可以保证 .then()
只调用它的包含函数一次!
流是连续的数据流。您不必手动请求一条推文,然后是另一条,然后是另一条。你打开水龙头,然后它就一直流,直到它完成或你告诉它停止。
因此,总而言之,您不能对流进行承诺,因为承诺是针对单个值的,而流是针对连续数据流的。
我假设您问这个问题是因为您喜欢 promise 界面并且想对流使用类似的东西?根据您要实现的目标,有不同的库可以更好地处理流。 EventStream 就是一个例子。让我知道你的计划,我也许可以给你举个例子。
Can I promisify a stream?
没有。虽然流不断发出事件,但承诺只会解决一次。它们具有完全不同的语义(即使两者都使用异步回调)。
您 可以 为结束的流作出承诺,请参阅 EventEmitterPromisifier
example in BlueBirds documentation - 但这不是您的 Twitter 流示例所做的。
Running this code does not log a tweet and no error is thrown.
因为T.stream()
是一个同步工厂函数,returns是一个流对象。您不需要 - 您不能 - 使用 streamAsync
,因为它永远不会调用隐式传递的回调。
我最终使用了 RxJS implementing observables with streams。
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]
var stream = T.stream('statuses/filter', { locations: sanFrancisco });
var source = Rx.Node.fromEvent(stream, 'tweet');
var observer = Rx.Observer.create(
function (tweet) {
// THIS IS WHERE EACH TWEET SHOULD COME FROM THE STREAM
console.log(tweet);
},
function (err) {
console.log('Error getting tweets: ' + err);
},
function () {
console.log('Completed');
}
);
source.subscribe(observer);
我最终不得不使用 RX.Observable.fromEvent 而不是 Rx.Node.fromStream 因为,Twit 模块必须在幕后处理实际流,但通过 EventEmitter 公开它,他们可能不应该命名它 T.stream.