RxJS:具有中止功能的生产者-消费者
RxJS: Producer-consumer with abort
我在 RxJS 中遇到了一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,通常必须等待生产者。这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
有时请求会被中止。生成的元素应该只在未中止的请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
第一个请求 r1
将使用第一个生成的元素 p1
,但是 r1
在可以使用 p1
之前被 a(r1)
中止。 p1
在第二次请求 r2
时被生产并被消耗 c(p1, r2)
。第二个 abort a(?)
被忽略,因为之前没有发生未应答的请求。第三个请求 r3
必须等待下一个生成的元素 p2
并且在生成 p2
之前不会中止。因此,p2
在生产后立即被消耗 c(p2, r3)
。
如何在 RxJS 中实现这一点?
编辑:
我在 jsbin 上创建了一个带有 QUnit 测试的 example。您可以将函数 createConsume(produce, request, abort)
编辑为 try/test 您的解决方案。
示例包含 .
的函数定义
这个(核心思想减去细节)通过了你的 JSBin 测试:
var consume = request
.zip(abort.merge(produce), (r,x) => [r,x])
.filter(([r,x]) => isNotAbort(x))
.map(([r,p]) => p);
还有 JSBin code.
我不太清楚如何使用现有的运算符来完成它。以下是如何使用 Observable.create()
:
return Rx.Observable.create(function (observer) {
var rsub = new Rx.SingleAssignmentDisposable();
var asub = new Rx.SingleAssignmentDisposable();
var psub = new Rx.SingleAssignmentDisposable();
var sub = new Rx.CompositeDisposable(rsub, asub, psub);
var rq = [];
var pq = [];
var completeCount = 0;
var complete = function () {
if (++completeCount === 2) {
observer.onCompleted();
}
};
var consume = function () {
if (pq.length && rq.length) {
var p = pq.shift();
var r = rq.shift();
observer.onNext('p' + p);
}
};
rsub.setDisposable(request.subscribe(
function (r) {
rq.push(r);
consume();
},
function (e) { observer.onError(e); },
complete));
asub.setDisposable(abort.subscribe(
function (a) {
rq.shift();
},
function (e) { observer.onError(e); }
));
psub.setDisposable(produce.subscribe(
function (p) {
pq.push(p);
consume();
},
function (e) { observer.onError(e); },
complete));
return sub;
});
此解决方案忽略未响应未答复请求的中止:
const {merge} = Rx.Observable;
Rx.Observable.prototype.wrapValue = function(wrapper) {
wrapper = (wrapper || {});
return this.map(function (value) {
wrapper.value = value;
return wrapper;
});
};
function createConsume(produce, request, abort) {
return merge(
produce.wrapValue({type: 'produce'}),
request.wrapValue({type: 'request'}),
abort.wrapValue({type: 'abort'})
)
.scan(
[false, []],
([isRequest, products], e) => {
// if last time the request was answered
if (isRequest && products.length) {
// remove consumed product
products.shift();
// mark request as answered
isRequest = false;
}
if (e.type === 'produce') {
// save product to consume later
products.push(e.value);
} else {
// if evaluated to false, e.type === 'abort'
isRequest = (e.type === 'request');
}
return [isRequest, products];
}
)
.filter( ([isRequest, products]) => (isRequest && products.length) )
.map( ([isRequest, products]) => products[0] ); // consume
}
Code 在 JSBin 的最新测试中。
我在 RxJS 中遇到了一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,通常必须等待生产者。这可以通过压缩生产者和请求流来实现:
var produce = getProduceStream();
var request = getRequestStream();
var consume = Rx.Observable.zipArray(produce, request).pluck(0);
有时请求会被中止。生成的元素应该只在未中止的请求后使用:
produce: -------------p1-------------------------p2--------->
request: --r1--------------r2---------------r3-------------->
abort: ------a(r1)------------------a(?)------------------>
consume: ------------------c(p1, r2)-------------c(p2, r3)-->
第一个请求 r1
将使用第一个生成的元素 p1
,但是 r1
在可以使用 p1
之前被 a(r1)
中止。 p1
在第二次请求 r2
时被生产并被消耗 c(p1, r2)
。第二个 abort a(?)
被忽略,因为之前没有发生未应答的请求。第三个请求 r3
必须等待下一个生成的元素 p2
并且在生成 p2
之前不会中止。因此,p2
在生产后立即被消耗 c(p2, r3)
。
如何在 RxJS 中实现这一点?
编辑:
我在 jsbin 上创建了一个带有 QUnit 测试的 example。您可以将函数 createConsume(produce, request, abort)
编辑为 try/test 您的解决方案。
示例包含
这个(核心思想减去细节)通过了你的 JSBin 测试:
var consume = request
.zip(abort.merge(produce), (r,x) => [r,x])
.filter(([r,x]) => isNotAbort(x))
.map(([r,p]) => p);
还有 JSBin code.
我不太清楚如何使用现有的运算符来完成它。以下是如何使用 Observable.create()
:
return Rx.Observable.create(function (observer) {
var rsub = new Rx.SingleAssignmentDisposable();
var asub = new Rx.SingleAssignmentDisposable();
var psub = new Rx.SingleAssignmentDisposable();
var sub = new Rx.CompositeDisposable(rsub, asub, psub);
var rq = [];
var pq = [];
var completeCount = 0;
var complete = function () {
if (++completeCount === 2) {
observer.onCompleted();
}
};
var consume = function () {
if (pq.length && rq.length) {
var p = pq.shift();
var r = rq.shift();
observer.onNext('p' + p);
}
};
rsub.setDisposable(request.subscribe(
function (r) {
rq.push(r);
consume();
},
function (e) { observer.onError(e); },
complete));
asub.setDisposable(abort.subscribe(
function (a) {
rq.shift();
},
function (e) { observer.onError(e); }
));
psub.setDisposable(produce.subscribe(
function (p) {
pq.push(p);
consume();
},
function (e) { observer.onError(e); },
complete));
return sub;
});
此解决方案忽略未响应未答复请求的中止:
const {merge} = Rx.Observable;
Rx.Observable.prototype.wrapValue = function(wrapper) {
wrapper = (wrapper || {});
return this.map(function (value) {
wrapper.value = value;
return wrapper;
});
};
function createConsume(produce, request, abort) {
return merge(
produce.wrapValue({type: 'produce'}),
request.wrapValue({type: 'request'}),
abort.wrapValue({type: 'abort'})
)
.scan(
[false, []],
([isRequest, products], e) => {
// if last time the request was answered
if (isRequest && products.length) {
// remove consumed product
products.shift();
// mark request as answered
isRequest = false;
}
if (e.type === 'produce') {
// save product to consume later
products.push(e.value);
} else {
// if evaluated to false, e.type === 'abort'
isRequest = (e.type === 'request');
}
return [isRequest, products];
}
)
.filter( ([isRequest, products]) => (isRequest && products.length) )
.map( ([isRequest, products]) => products[0] ); // consume
}
Code 在 JSBin 的最新测试中。