RxJS:具有中止功能的生产者-消费者

RxJS: Producer-consumer with abort

我在 RxJS 中遇到了一个特殊的生产者消费者问题:生产者缓慢地生产元素。消费者正在请求元素,通常必须等待生产者。这可以通过压缩生产者和请求流来实现:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

有时请求会被中止。生成的元素应该只在未中止的请求后使用:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

第一个请求 r1 将使用第一个生成的元素 p1,但是 r1 在可以使用 p1 之前被 a(r1) 中止。 p1 在第二次请求 r2 时被生产并被消耗 c(p1, r2)。第二个 abort a(?) 被忽略,因为之前没有发生未应答的请求。第三个请求 r3 必须等待下一个生成的元素 p2 并且在生成 p2 之前不会中止。因此,p2 在生产后立即被消耗 c(p2, r3)

如何在 RxJS 中实现这一点?

编辑: 我在 jsbin 上创建了一个带有 QUnit 测试的 example。您可以将函数 createConsume(produce, request, abort) 编辑为 try/test 您的解决方案。

示例包含 .

的函数定义

这个(核心思想减去细节)通过了你的 JSBin 测试:

var consume = request
  .zip(abort.merge(produce), (r,x) => [r,x])
  .filter(([r,x]) => isNotAbort(x))
  .map(([r,p]) => p);

还有 JSBin code.

我不太清楚如何使用现有的运算符来完成它。以下是如何使用 Observable.create():

return Rx.Observable.create(function (observer) {
  var rsub = new Rx.SingleAssignmentDisposable();
  var asub = new Rx.SingleAssignmentDisposable();
  var psub = new Rx.SingleAssignmentDisposable();
  var sub = new Rx.CompositeDisposable(rsub, asub, psub);
  var rq = [];
  var pq = [];
  var completeCount = 0;
  var complete = function () {
    if (++completeCount === 2) {
      observer.onCompleted();
    }
  };
  var consume = function () {
    if (pq.length && rq.length) {
      var p = pq.shift();
      var r = rq.shift();
      observer.onNext('p' + p);
    }
  };

  rsub.setDisposable(request.subscribe(
    function (r) {
      rq.push(r);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));

  asub.setDisposable(abort.subscribe(
    function (a) {
      rq.shift();
    },
    function (e) { observer.onError(e); }
  ));

  psub.setDisposable(produce.subscribe(
    function (p) {
      pq.push(p);
      consume();
    },
    function (e) { observer.onError(e); },
    complete));


  return sub;
});

http://jsbin.com/zurepesijo/1/

此解决方案忽略未响应未答复请求的中止:

const {merge} = Rx.Observable;

Rx.Observable.prototype.wrapValue = function(wrapper) {
    wrapper = (wrapper || {});
    return this.map(function (value) {
        wrapper.value = value;
        return wrapper;
    });
};

function createConsume(produce, request, abort) {
  return merge(
            produce.wrapValue({type: 'produce'}),
            request.wrapValue({type: 'request'}),
            abort.wrapValue({type: 'abort'})
         )
         .scan(
            [false, []],
            ([isRequest, products], e) => {
                // if last time the request was answered
                if (isRequest && products.length) {
                    // remove consumed product
                    products.shift();
                    // mark request as answered
                    isRequest = false;
                }
                if (e.type === 'produce') {
                    // save product to consume later
                    products.push(e.value);
                } else {
                    // if evaluated to false, e.type === 'abort'
                    isRequest = (e.type === 'request');
                }
                return [isRequest, products];
            }
         )
         .filter( ([isRequest, products]) => (isRequest && products.length) )
         .map( ([isRequest, products]) => products[0] ); // consume
}

Code 在 JSBin 的最新测试中。