使用 Bluebird.js 和 Twitter 流的承诺和流

Promises and streams using Bluebird.js and Twitter stream

我是 Promises 和 Node 的新手,对在流中使用 promises 很好奇。我可以承诺一个流吗? 使用 Bluebirdjs 和 Twit 模块我有以下内容:

var Twit = require('twit')
var Promise = require("bluebird");

var T = new Twit({
 consumer_key:         process.env.CONSUMER_KEY,
 consumer_secret:      process.env.CONSUMER_SECRET,
 access_token:         process.env.ACCESS_TOKEN,
 access_token_secret:  process.env.ACCESS_TOKEN_SECRET
})

Promise.promisifyAll(Twit);
Promise.promisifyAll(T);

var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]

T.streamAsync('statuses/filter', { locations: sanFrancisco })
.then(function(connection){
 connection.onAsync('tweet')
  .then(function (tweet) {
   console.log(tweet)
  })
});

运行 此代码不会记录推文,也不会引发任何错误。似乎没有任何反应,但 .then 的 none 承诺有效。

原始片段,在尝试实现 twit docs

中的承诺之前
var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]

var stream = T.stream('statuses/filter', { locations: sanFrancisco })

stream.on('tweet', function (tweet) {
 console.log(tweet)
})

很抱歉,这行不通,因为 promise 和流之间存在根本区别。你说你对这两个都是新手,所以让我给你简单介绍一下。

Promises 可以被视为一些可能尚未到达的单个值的占位符。例如,一些假设函数 getTweet() 可以这样工作:

getTweet()
    .then(function (tweet) {
        //Do something with your tweet!
        console.log(tweet);
    });

但这只会给你一条推文!要获得另一个,您必须再次调用 getTweet(),然后再调用一个新的 .then()。事实上,当使用 promises 时,你可以保证 .then() 只调用它的包含函数一次!

流是连续的数据流。您不必手动请求一条推文,然后是另一条,然后是另一条。你打开水龙头,然后它就一直流,直到它完成或你告诉它停止。

因此,总而言之,您不能对流进行承诺,因为承诺是针对单个值的,而流是针对连续数据流的。

我假设您问这个问题是因为您喜欢 promise 界面并且想对流使用类似的东西?根据您要实现的目标,有不同的库可以更好地处理流。 EventStream 就是一个例子。让我知道你的计划,我也许可以给你举个例子。

Can I promisify a stream?

没有。虽然流不断发出事件,但承诺只会解决一次。它们具有完全不同的语义(即使两者都使用异步回调)。

可以 为结束的流作出承诺,请参阅 EventEmitterPromisifier example in BlueBirds documentation - 但这不是您的 Twitter 流示例所做的。

Running this code does not log a tweet and no error is thrown.

因为T.stream()是一个同步工厂函数,returns是一个流对象。您不需要 - 您不能 - 使用 streamAsync,因为它永远不会调用隐式传递的回调。

我最终使用了 RxJS implementing observables with streams

var sanFrancisco = [ '-122.75', '36.8', '-121.75', '37.8' ]

var stream = T.stream('statuses/filter', { locations: sanFrancisco });

var source =  Rx.Node.fromEvent(stream, 'tweet');

var observer = Rx.Observer.create(
    function (tweet) {
        // THIS IS WHERE EACH TWEET SHOULD COME FROM THE STREAM
        console.log(tweet);
    },
    function (err) {
        console.log('Error getting tweets: ' + err);
    },
    function () {
        console.log('Completed');
    }
);

source.subscribe(observer);

我最终不得不使用 RX.Observable.fromEvent 而不是 Rx.Node.fromStream 因为,Twit 模块必须在幕后处理实际流,但通过 EventEmitter 公开它,他们可能不应该命名它 T.stream.