如何在 RxJS5 中应用定时背压?
How can I apply timed back pressure in RxJS5?
假设我有以下代码:
let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))
这会立即输出结果。现在,我如何在每条消息之间设置一个定时延迟作为 背压方式 (请注意,我不需要缓冲区;相反,我想要 a
和b
成为 Cold Observables),例如:
b.takeEvery(1000).forEach(t => console.log(t))
得到完全相同的答案:
<wait 1s>
2
<wait 1s>
4
<wait 1s>
6
备选方案: 如果 RxJS 不支持背压(某些可观察对象的拉动机制),那么如何在没有 运行 的情况下创建无限生成器资源?
方案二:其他同时支持pull和push机制的JS框架?
如果是RxJS5.x背压是不支持的,但是有例如pausable
operator in 4.x version. It works only with hot observables. More info on back pressure in case of 4.x and here(特别是在底部抢劫和RxJS相关的描述)
这条 Erik Meijer 的推文可能有点争议但相关:https://twitter.com/headinthebox/status/774635475071934464
为了实现您自己的背压机制,您需要有 2 路通信通道,这可以很容易地用 2 个主题创建 - 每端一个。基本上使用 next
发送消息和 .subscribe
列表到另一端。
创建生成器也是可行的 - 再次使用主题在基于推和基于拉的世界之间架起桥梁。下面是生成斐波那契数列的示例性实现。
const fib = () => {
const n = new Rx.Subject()
const f = n
.scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
.map(c => c.a)
return {
$: f,
next: () => n.next()
}
}
const f = fib()
f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
.do(f.next)
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
<button id='f'>NEXT FIBONACCI</button>
<div id='r'>_?_<div>
您可能感兴趣的另一个 js 库是 https://github.com/ubolonton/js-csp - 没有使用它,所以不确定它如何处理背压。
想法是在前一个完成执行时排队等待时间Fiddle
let a = Rx.Observable.of(1, 2, 3);
let b = Rx.Observable.zip(a, a, (a, b) => a + b);
// getting values into array
var x = [];
b.forEach(t => x.push(t));
var takeEvery = function(msec,items,action,index=0){
if(typeof(action) == "function")
if(index<items.length)
setTimeout(
function(item,ind){
action(item);
takeEvery(msec,items,action,ind);
},msec, items[index],++index);
};
// queueing over time
takeEvery(1000,x, function(item){
console.log(item);
});
假设我有以下代码:
let a = Rx.Observable.of(1, 2, 3)
let b = Observable.zip(a, a, (a, b) => a + b)
b.forEach(t => console.log(t))
这会立即输出结果。现在,我如何在每条消息之间设置一个定时延迟作为 背压方式 (请注意,我不需要缓冲区;相反,我想要 a
和b
成为 Cold Observables),例如:
b.takeEvery(1000).forEach(t => console.log(t))
得到完全相同的答案:
<wait 1s>
2
<wait 1s>
4
<wait 1s>
6
备选方案: 如果 RxJS 不支持背压(某些可观察对象的拉动机制),那么如何在没有 运行 的情况下创建无限生成器资源?
方案二:其他同时支持pull和push机制的JS框架?
如果是RxJS5.x背压是不支持的,但是有例如pausable
operator in 4.x version. It works only with hot observables. More info on back pressure in case of 4.x and here(特别是在底部抢劫和RxJS相关的描述)
这条 Erik Meijer 的推文可能有点争议但相关:https://twitter.com/headinthebox/status/774635475071934464
为了实现您自己的背压机制,您需要有 2 路通信通道,这可以很容易地用 2 个主题创建 - 每端一个。基本上使用 next
发送消息和 .subscribe
列表到另一端。
创建生成器也是可行的 - 再次使用主题在基于推和基于拉的世界之间架起桥梁。下面是生成斐波那契数列的示例性实现。
const fib = () => {
const n = new Rx.Subject()
const f = n
.scan(c => ({ a: c.b, b: c.b + c.a }), { a: 0, b: 1 })
.map(c => c.a)
return {
$: f,
next: () => n.next()
}
}
const f = fib()
f.$.subscribe(n => document.querySelector('#r').innerHTML = n)
Rx.Observable.fromEvent(document.querySelector('#f'), 'click')
.do(f.next)
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.6/Rx.js"></script>
<button id='f'>NEXT FIBONACCI</button>
<div id='r'>_?_<div>
您可能感兴趣的另一个 js 库是 https://github.com/ubolonton/js-csp - 没有使用它,所以不确定它如何处理背压。
想法是在前一个完成执行时排队等待时间Fiddle
let a = Rx.Observable.of(1, 2, 3);
let b = Rx.Observable.zip(a, a, (a, b) => a + b);
// getting values into array
var x = [];
b.forEach(t => x.push(t));
var takeEvery = function(msec,items,action,index=0){
if(typeof(action) == "function")
if(index<items.length)
setTimeout(
function(item,ind){
action(item);
takeEvery(msec,items,action,ind);
},msec, items[index],++index);
};
// queueing over time
takeEvery(1000,x, function(item){
console.log(item);
});