使用 rxjs 的集成测试 geteventstore 具有竞争条件
integration test geteventstore using rxjs has race condition
抱歉,这个有点乱。我的项目在 nodejs 中。我在摩卡咖啡中进行了测试。我在其中打开一个到 geteventstore 的连接并订阅一个流。这实际上开始发出事件。
我将该事件订阅包装在一个 rxjs observable 中,然后将其写入控制台。
一半时间我会收到充满事件的流,一半时间我不会。
我感觉 eventloop 开始监听,什么也没听到,然后在 geteventstore 可以开始用事件爆破它之前关闭。
我有点不知所措。我可以看出 geteventstore 正在发送数据,因为我收到数据的时间只有一半。我的理解是,只要有人订阅了一个事件,例如有一个事件监听器,循环将保持打开状态。
所以问题可能出在 rxjs 上?
我不知道,如有任何帮助,我们将不胜感激。
----编辑
我不知道这是否有帮助,但测试看起来像这样。
context('when calling subscription', ()=> {
it('should stay open', function () {
mut = bootstrap.getInstanceOf('gesConnection');
var rx = bootstrap.getInstanceOf('rx');
var subscription = mut.subscribeToAllFrom();
rx.Observable.fromEvent(subscription, 'event').forEach(x=> console.log(x));
subscription.on('event', function (payload) {
console.log('event received by dispatcher');
console.log('event processed by dispatcher');
});
mut._handler._connectingPhase.must.equal('Connected');
})
});
所以 mut 是一个到 geteventstore 的连接,rx 是 rxjs,而订阅对象是一个事件发射器,它将数据从 geteventstore 中抽出。
我知道这个问题是混淆的,因为它涉及至少两个有点不寻常的产品,geteventstore 和 rxjs。
我的意思是我非常有信心 gesConnection 和订阅实际上正在连接和发射。我只是不知道如何 test/investigate 进一步。
谢谢
我没看到你在使用 Mocha's async testing facilities。
MochaJs 不知道它等待您的测试的时间应该比您的函数执行 return.
的时间长
通常你会return一个承诺:
it('must stay open', () => {
mut = bootstrap.getInstanceOf('gesConnection');
var rx = bootstrap.getInstanceOf('rx');
var subscription = mut.subscribeToAllFrom();
subscription.on('event', function (payload) {
console.log('event received by dispatcher');
console.log('event processed by dispatcher');
});
var promise = rx.Observable
.fromEvent(subscription, 'event')
.take(100) // stop test after 100 events
.do(x => console.log(x))
.finally(() => {
// do any cleanup here.
// such as close your connection
// or "subscription" variable
})
.toPromise();
mut._handler._connectingPhase.must.equal('Connected');
// tells Mocha to wait until the observable completes
return promise;
});
抱歉,这个有点乱。我的项目在 nodejs 中。我在摩卡咖啡中进行了测试。我在其中打开一个到 geteventstore 的连接并订阅一个流。这实际上开始发出事件。
我将该事件订阅包装在一个 rxjs observable 中,然后将其写入控制台。
一半时间我会收到充满事件的流,一半时间我不会。
我感觉 eventloop 开始监听,什么也没听到,然后在 geteventstore 可以开始用事件爆破它之前关闭。
我有点不知所措。我可以看出 geteventstore 正在发送数据,因为我收到数据的时间只有一半。我的理解是,只要有人订阅了一个事件,例如有一个事件监听器,循环将保持打开状态。
所以问题可能出在 rxjs 上?
我不知道,如有任何帮助,我们将不胜感激。
----编辑
我不知道这是否有帮助,但测试看起来像这样。
context('when calling subscription', ()=> {
it('should stay open', function () {
mut = bootstrap.getInstanceOf('gesConnection');
var rx = bootstrap.getInstanceOf('rx');
var subscription = mut.subscribeToAllFrom();
rx.Observable.fromEvent(subscription, 'event').forEach(x=> console.log(x));
subscription.on('event', function (payload) {
console.log('event received by dispatcher');
console.log('event processed by dispatcher');
});
mut._handler._connectingPhase.must.equal('Connected');
})
});
所以 mut 是一个到 geteventstore 的连接,rx 是 rxjs,而订阅对象是一个事件发射器,它将数据从 geteventstore 中抽出。
我知道这个问题是混淆的,因为它涉及至少两个有点不寻常的产品,geteventstore 和 rxjs。
我的意思是我非常有信心 gesConnection 和订阅实际上正在连接和发射。我只是不知道如何 test/investigate 进一步。
谢谢
我没看到你在使用 Mocha's async testing facilities。
MochaJs 不知道它等待您的测试的时间应该比您的函数执行 return.
的时间长通常你会return一个承诺:
it('must stay open', () => {
mut = bootstrap.getInstanceOf('gesConnection');
var rx = bootstrap.getInstanceOf('rx');
var subscription = mut.subscribeToAllFrom();
subscription.on('event', function (payload) {
console.log('event received by dispatcher');
console.log('event processed by dispatcher');
});
var promise = rx.Observable
.fromEvent(subscription, 'event')
.take(100) // stop test after 100 events
.do(x => console.log(x))
.finally(() => {
// do any cleanup here.
// such as close your connection
// or "subscription" variable
})
.toPromise();
mut._handler._connectingPhase.must.equal('Connected');
// tells Mocha to wait until the observable completes
return promise;
});