使用 RxJS 5 可观察对象的延迟模式

Deferred pattern with RxJS 5 observables

对于任意 promise 实现,延迟模式(不要与 antipattern 混淆)可能如下所示:

const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });

deferred 对象持有未定的承诺,可以通过引用传递给其他函数范围。所有承诺链都将在承诺结算时执行,deferred.promise 是否在与 then 链接之前或之后结算并不重要。 promise的状态一旦确定就不能更改


如答案所示,初始选择是 ReplaySubjectAsyncSubject

对于给定的设置 (a demo)

var subject = new Rx.AsyncSubject;
var deferred = subject.first();

deferred.subscribe(
  console.log.bind(console, 'Early result'),
  console.log.bind(console, 'Early error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Late result'),
    console.log.bind(console, 'Late error')
  );
});

这会导致理想的行为:

subject.error('one');
subject.next('two');

Early error one

Late error one

这会导致不良行为:

subject.error('one');
subject.next('two');
subject.complete();

Early error one

Late result two

这会导致不良行为:

subject.next('two');
subject.complete();
subject.next('three');

Early result two

Late result three

ReplaySubject 的结果有所不同,但仍与预期结果不一致。 next 值和 error 错误被分开处理,并且 complete 不会阻止观察者接收新数据。这可能适用于单个 next/error,问题是 nexterror 可能会被无意中调用多次。

之所以用first()是因为subscribe是一次性订阅的,为了不泄密我想去掉

应该如何使用 RxJS observables 来实现?

您可能正在寻找 Rx.ReplaySubject(1)(或 Rx.AsyncSubject(),具体取决于您的用例)。

有关主题的更详细说明,请参阅

基本上,主题可以通过引用传递,就像延迟一样。你可以向它发出值(resolve 是 'next' (Rxjs v5) 或 'onNext' (Rxjs v4) 后跟 'complete''onCompleted()'),只要你持有那个参考。

一个主题可以有任意数量的订阅者,类似于 then 的延迟。如果您使用 replaySubject(1),任何订阅者都将收到最后发出的值,该值应该回答您的 it doesn't matter if deferred.promise was settled before chaining with then or after.。在 Rxjs v4 中,replaySubject 将在完成订阅后将其最后一个值发送给订阅者。我不确定 Rxjs v5 中的行为。

更新

The following code 使用 Rxjs v4 执行:

var subject = new Rx.AsyncSubject();
var deferred = subject;

deferred.subscribe(
  console.log.bind(console, 'First result'),
  console.log.bind(console, 'First error')
);

setTimeout(() => {
  deferred.subscribe(
    console.log.bind(console, 'Second result'),
    console.log.bind(console, 'Second error')
  );
});

subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');

产生以下输出:

First result one
Second result one

但是,使用 Rxjs v5 执行相同的代码 does not :

First result one
Second result four

所以基本上这意味着 主体的语义在 Rxjs v5 中发生了变化!!!这确实是一个需要注意的重大变化。无论如何,您可以考虑回到 Rxjs v4,或者使用 artur grzesiak 在他的回答中建议的转变。您也可以在 github 站点上提交问题。我相信这种变化是有意的,但如果不是,提交问题可能有助于澄清情况。无论如何,无论选择何种行为,都应该妥善记录。

features a link showing the async subject in relation with multiple and late subscription

如@user3743222 所写,AsyncSubject 可能会在 deferred 实现中使用,但问题是它必须是 private 并防止多个 resolves / rejects.

下面是一个可能的实现镜像resolve-reject-promise结构:

const createDeferred = () => {
  const pending = new Rx.AsyncSubject(); // caches last value / error
  const end = (result) => {
    if (pending.isStopped) {
      console.warn('Deferred already resloved/rejected.'); // optionally throw
      return;
    }
    
    if (result.isValue) {
      pending.next(result.value);
      pending.complete();
    } else {
      pending.error(result.error);
    }
  }
  return {
    resolve: (value) => end({isValue: true, value: value }),
    reject: (error) => end({isValue: false, error: error }),
    observable: pending.asObservable() // hide subject
  };
}

// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));

// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
  def.resolve(1);
  setTimeout(() => {
    def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
    def.resolve(2); // warn
    def.reject('err'); // warn
  }, 1000)
}, 1000);

// async error example
const def3 = createDeferred();
def3.observable.subscribe(
  (n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
  (err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
  def3.reject('ERR');
  setTimeout(() => {
    def3.observable.subscribe(
      (n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
      (err) => console.error('ERROR-AFTER-REJECTED', err));
    def3.resolve(2); // warn
    def3.reject('err'); // warn
  }, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>