如何在评估缓冲区时根据缓冲区中的值暂停 RxJS 缓冲的可观察对象?
How can I pause an RxJS buffered observable based on the values in the buffer as they are evaluated?
我有一个 observable,它包装了来自服务器的 socket.io 事件流(称之为 source
)。来自 socket.io 的每条消息都会在可观察对象上发出。根据 socket.io 消息的内容,此可观察对象被过滤并映射到多个订阅。
例如:
var filtered = source.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) { /* do something with msg */ });
有时这些订阅会在我的应用程序中触发长动画。我想在发生这种情况时暂停源 observable,缓冲新事件直到动画播放完毕,然后恢复 observable。
我使用 pausableBuffered
:
使所有这些都按预期工作
var pausable = source.pausableBuffered();
var filtered = pausable.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) {
pausable.pause();
/**
* do something async, like animation, then when done call
* pausable.resume();
*/
});
到目前为止一切顺利。
但是,我们假设当 observable 暂停时,缓冲了 5 条消息。第三条消息是需要再次暂停流的消息。它有一个订阅设置来这样做。然而,一旦源 observable 取消暂停,它立即清空所有五个事件的缓冲区,所有事件都得到处理并传递给所有五个订阅,此时第三个消息的订阅最终暂停原始流。
我明白为什么会这样,但我真正想要的是:
- 源已暂停
- 缓冲了五个事件,第三个应该在处理订阅时暂停源。
- 源已恢复。
- 事件 #1 和 #2 由他们的订阅处理,
- 事件 #3 的订阅暂停了源。
- 可能更多事件可以缓冲在#4 和#5 之后,它们仍在缓冲区中等待。
- 事件 #3 的订阅在短时间后恢复来源
- 事件 #4 和 #5 以及任何其他事件开始传播,直到源发出另一个应该暂停的事件。
似乎我使用 pausableBuffered
的每一种方式最终都会将整个缓冲区转储到他们的所有订阅中。我怎样才能实现我正在寻找的东西?
您可以尝试 controlled
observable。给你几乎完全的控制权。
例如:
var source = Rx.Observable.interval(300).take(10);
var controlled = source.controlled();
var sourceSub = source.subscribe(
function (x) {
console.log('Next source: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var controlledSub = controlled.subscribe(
function (x) {
console.log('Next controlled: ' + x.toString());
if (x === 3) {
setTimeout(function(){
controlled.request(1)
}, 2000)
} else {
controlled.request(1);
}
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
controlled.request(1);
我有一个 observable,它包装了来自服务器的 socket.io 事件流(称之为 source
)。来自 socket.io 的每条消息都会在可观察对象上发出。根据 socket.io 消息的内容,此可观察对象被过滤并映射到多个订阅。
例如:
var filtered = source.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) { /* do something with msg */ });
有时这些订阅会在我的应用程序中触发长动画。我想在发生这种情况时暂停源 observable,缓冲新事件直到动画播放完毕,然后恢复 observable。
我使用 pausableBuffered
:
var pausable = source.pausableBuffered();
var filtered = pausable.filter(function(msg) { return msg.name === 'EventName' });
filtered.subscribe(function handeEventName(msg) {
pausable.pause();
/**
* do something async, like animation, then when done call
* pausable.resume();
*/
});
到目前为止一切顺利。
但是,我们假设当 observable 暂停时,缓冲了 5 条消息。第三条消息是需要再次暂停流的消息。它有一个订阅设置来这样做。然而,一旦源 observable 取消暂停,它立即清空所有五个事件的缓冲区,所有事件都得到处理并传递给所有五个订阅,此时第三个消息的订阅最终暂停原始流。
我明白为什么会这样,但我真正想要的是:
- 源已暂停
- 缓冲了五个事件,第三个应该在处理订阅时暂停源。
- 源已恢复。
- 事件 #1 和 #2 由他们的订阅处理,
- 事件 #3 的订阅暂停了源。
- 可能更多事件可以缓冲在#4 和#5 之后,它们仍在缓冲区中等待。
- 事件 #3 的订阅在短时间后恢复来源
- 事件 #4 和 #5 以及任何其他事件开始传播,直到源发出另一个应该暂停的事件。
似乎我使用 pausableBuffered
的每一种方式最终都会将整个缓冲区转储到他们的所有订阅中。我怎样才能实现我正在寻找的东西?
您可以尝试 controlled
observable。给你几乎完全的控制权。
例如:
var source = Rx.Observable.interval(300).take(10);
var controlled = source.controlled();
var sourceSub = source.subscribe(
function (x) {
console.log('Next source: ' + x.toString());
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
var controlledSub = controlled.subscribe(
function (x) {
console.log('Next controlled: ' + x.toString());
if (x === 3) {
setTimeout(function(){
controlled.request(1)
}, 2000)
} else {
controlled.request(1);
}
},
function (err) {
console.log('Error: ' + err);
},
function () {
console.log('Completed');
});
controlled.request(1);