在简单用例中使用 RxJS5 避免递归

Avoiding recursion with RxJS5 in simple use case

我正在尝试弄清楚如何避免递归(如果可能的话)。我正在使用 RxJS 在队列上创建方法。如果队列不为空,则 drain 方法会递归调用自身。 drain 方法目前被设计为一次从队列中删除一个项目,直到它为空。以下似乎适用于此目的,但我想知道是否有可能避免递归调用 drain 。递归的问题是我可能无法 "return" 递归方法中的任何项目,直到它完成递归(这是我的猜测)。

所以我的问题是:

我希望 drain() 的订阅者分别从队列中接收每个项目,而不是在该方法递归完成后立即从队列中接收所有耗尽的项目。我怎样才能做到这一点?我可以用递归方法完成这个,还是我只能用非递归方法完成这个?如果是后者,怎么办?

this method will drain the queue, and stop trying when the queue is empty we need to lock, remove an item, then unlock, every time, so it's easiest to just use recursion and re-call the drain method if the queue is not empty

Queue.prototype.drain = function (opts) {

    opts = opts || {};
    const delay = opts.delay || 500;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(obj)
                });
        })
        .flatMap(obj => {
            return removeOneLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(obj => l);
                });
        })
        .flatMap(() => {
            return Rx.Observable.timer(delay)
                .flatMap(() => {
                    return this.drain()   /// <<< recurse
                        .takeUntil(this.isEmpty());  /// <<<< until
                });
        })
        .catch(e => {
            const force = !String(e.stack || e).match(/acquire lock timed out/);
            return releaseLock(this, force);
        });

};

// check if the queue is empty

Queue.prototype.isEmpty = function () {

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    return acquireLockRetry(obj)
                })
        })
        .flatMap(obj => {
            return findFirstLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(obj => l);
                });
        })
        .filter(l => {
            // filter out any lines => only fire event if there is no line
            return !l;
        })
        .catch(e => {
            const force = !String(e.stack || e).match(/acquire lock timed out/);
            return releaseLock(this, force);
        });

};

一个可能的解决方案是这样的:

    const obs = new Rx.Subject();

    q.drain(obs).subscribe(function (v) {
        console.log('end result => ', v);
    });

    obs.subscribe(function (v) {
        console.log('next item that was drained => ', v);
    });

排水方法就变成了:

Queue.prototype.drain = function (obs, opts) {

    opts = opts || {};

    const delay = opts.delay || 500;

    return this.init()
        .flatMap(() => {
            return acquireLock(this)
                .flatMap(obj => {
                    console.log(' drain lock id => ', obj.id);
                    return acquireLockRetry(obj)
                });
        })
        .flatMap(obj => {
            return removeOneLine(this)
                .flatMap(l => {
                    return releaseLock(this, obj.id)
                        .map(obj => {
                            obs.next(l);
                            return l;
                        });
                });
        })
        .flatMap(() => {
            return Rx.Observable.timer(500)
                .flatMap(() => {
                    return this.drain(obs, opts)
                        .takeUntil(this.isEmpty());
                });
        })
        .catch(e => {
            console.error('\n', ' => isEmpty() error => \n', e.stack || e);
            const force = !String(e.stack || e).match(/acquire lock timed out/);
            return releaseLock(this, force);
        });

};

以这种方式,每次删除一个项目时,它都会触发一些东西,然后希望在最后,当队列完全耗尽时触发一个事件。