使用 RxJS 5 可观察对象的延迟模式
Deferred pattern with RxJS 5 observables
对于任意 promise 实现,延迟模式(不要与 antipattern 混淆)可能如下所示:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
deferred
对象持有未定的承诺,可以通过引用传递给其他函数范围。所有承诺链都将在承诺结算时执行,deferred.promise
是否在与 then
链接之前或之后结算并不重要。 promise的状态一旦确定就不能更改
如答案所示,初始选择是 ReplaySubject
和 AsyncSubject
。
对于给定的设置 (a demo)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
这会导致理想的行为:
subject.error('one');
subject.next('two');
Early error one
Late error one
这会导致不良行为:
subject.error('one');
subject.next('two');
subject.complete();
Early error one
Late result two
这会导致不良行为:
subject.next('two');
subject.complete();
subject.next('three');
Early result two
Late result three
ReplaySubject
的结果有所不同,但仍与预期结果不一致。 next
值和 error
错误被分开处理,并且 complete
不会阻止观察者接收新数据。这可能适用于单个 next
/error
,问题是 next
或 error
可能会被无意中调用多次。
之所以用first()
是因为subscribe
是一次性订阅的,为了不泄密我想去掉
应该如何使用 RxJS observables 来实现?
您可能正在寻找 Rx.ReplaySubject(1)
(或 Rx.AsyncSubject()
,具体取决于您的用例)。
有关主题的更详细说明,请参阅 。
基本上,主题可以通过引用传递,就像延迟一样。你可以向它发出值(resolve 是 'next'
(Rxjs v5) 或 'onNext'
(Rxjs v4) 后跟 'complete'
或 'onCompleted()'
),只要你持有那个参考。
一个主题可以有任意数量的订阅者,类似于 then
的延迟。如果您使用 replaySubject(1)
,任何订阅者都将收到最后发出的值,该值应该回答您的 it doesn't matter if deferred.promise was settled before chaining with then or after.
。在 Rxjs v4 中,replaySubject
将在完成订阅后将其最后一个值发送给订阅者。我不确定 Rxjs v5 中的行为。
- https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md
- https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md
更新
The following code 使用 Rxjs v4 执行:
var subject = new Rx.AsyncSubject();
var deferred = subject;
deferred.subscribe(
console.log.bind(console, 'First result'),
console.log.bind(console, 'First error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Second result'),
console.log.bind(console, 'Second error')
);
});
subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');
产生以下输出:
First result one
Second result one
但是,使用 Rxjs v5 执行相同的代码 does not :
First result one
Second result four
所以基本上这意味着 主体的语义在 Rxjs v5 中发生了变化!!!这确实是一个需要注意的重大变化。无论如何,您可以考虑回到 Rxjs v4,或者使用 artur grzesiak 在他的回答中建议的转变。您也可以在 github 站点上提交问题。我相信这种变化是有意的,但如果不是,提交问题可能有助于澄清情况。无论如何,无论选择何种行为,都应该妥善记录。
features a link showing the async subject in relation with multiple and late subscription
如@user3743222 所写,AsyncSubject
可能会在 deferred
实现中使用,但问题是它必须是 private
并防止多个 resolve
s / reject
s.
下面是一个可能的实现镜像resolve-reject-promise
结构:
const createDeferred = () => {
const pending = new Rx.AsyncSubject(); // caches last value / error
const end = (result) => {
if (pending.isStopped) {
console.warn('Deferred already resloved/rejected.'); // optionally throw
return;
}
if (result.isValue) {
pending.next(result.value);
pending.complete();
} else {
pending.error(result.error);
}
}
return {
resolve: (value) => end({isValue: true, value: value }),
reject: (error) => end({isValue: false, error: error }),
observable: pending.asObservable() // hide subject
};
}
// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));
// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
def.resolve(1);
setTimeout(() => {
def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
def.resolve(2); // warn
def.reject('err'); // warn
}, 1000)
}, 1000);
// async error example
const def3 = createDeferred();
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
(err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
def3.reject('ERR');
setTimeout(() => {
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
(err) => console.error('ERROR-AFTER-REJECTED', err));
def3.resolve(2); // warn
def3.reject('err'); // warn
}, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>
对于任意 promise 实现,延迟模式(不要与 antipattern 混淆)可能如下所示:
const deferred = new Deferred;
...
// scopes where `deferred` object reference was passed before promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
...
deferred.resolve(...);
// doesn't affect promise state
deferred.reject();
...
// after promise settlement
deferred.promise.then((result) => { ... }, (error) => { ... });
deferred
对象持有未定的承诺,可以通过引用传递给其他函数范围。所有承诺链都将在承诺结算时执行,deferred.promise
是否在与 then
链接之前或之后结算并不重要。 promise的状态一旦确定就不能更改
如答案所示,初始选择是 ReplaySubject
和 AsyncSubject
。
对于给定的设置 (a demo)
var subject = new Rx.AsyncSubject;
var deferred = subject.first();
deferred.subscribe(
console.log.bind(console, 'Early result'),
console.log.bind(console, 'Early error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Late result'),
console.log.bind(console, 'Late error')
);
});
这会导致理想的行为:
subject.error('one');
subject.next('two');
Early error one
Late error one
这会导致不良行为:
subject.error('one');
subject.next('two');
subject.complete();
Early error one
Late result two
这会导致不良行为:
subject.next('two');
subject.complete();
subject.next('three');
Early result two
Late result three
ReplaySubject
的结果有所不同,但仍与预期结果不一致。 next
值和 error
错误被分开处理,并且 complete
不会阻止观察者接收新数据。这可能适用于单个 next
/error
,问题是 next
或 error
可能会被无意中调用多次。
之所以用first()
是因为subscribe
是一次性订阅的,为了不泄密我想去掉
应该如何使用 RxJS observables 来实现?
您可能正在寻找 Rx.ReplaySubject(1)
(或 Rx.AsyncSubject()
,具体取决于您的用例)。
有关主题的更详细说明,请参阅
基本上,主题可以通过引用传递,就像延迟一样。你可以向它发出值(resolve 是 'next'
(Rxjs v5) 或 'onNext'
(Rxjs v4) 后跟 'complete'
或 'onCompleted()'
),只要你持有那个参考。
一个主题可以有任意数量的订阅者,类似于 then
的延迟。如果您使用 replaySubject(1)
,任何订阅者都将收到最后发出的值,该值应该回答您的 it doesn't matter if deferred.promise was settled before chaining with then or after.
。在 Rxjs v4 中,replaySubject
将在完成订阅后将其最后一个值发送给订阅者。我不确定 Rxjs v5 中的行为。
- https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/asyncsubject.md
- https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/subjects/replaysubject.md
更新
The following code 使用 Rxjs v4 执行:
var subject = new Rx.AsyncSubject();
var deferred = subject;
deferred.subscribe(
console.log.bind(console, 'First result'),
console.log.bind(console, 'First error')
);
setTimeout(() => {
deferred.subscribe(
console.log.bind(console, 'Second result'),
console.log.bind(console, 'Second error')
);
});
subject.onNext('one');
subject.onCompleted();
subject.onNext('two');
subject.onNext('three');
subject.onNext('four');
产生以下输出:
First result one
Second result one
但是,使用 Rxjs v5 执行相同的代码 does not :
First result one
Second result four
所以基本上这意味着 主体的语义在 Rxjs v5 中发生了变化!!!这确实是一个需要注意的重大变化。无论如何,您可以考虑回到 Rxjs v4,或者使用 artur grzesiak 在他的回答中建议的转变。您也可以在 github 站点上提交问题。我相信这种变化是有意的,但如果不是,提交问题可能有助于澄清情况。无论如何,无论选择何种行为,都应该妥善记录。
如@user3743222 所写,AsyncSubject
可能会在 deferred
实现中使用,但问题是它必须是 private
并防止多个 resolve
s / reject
s.
下面是一个可能的实现镜像resolve-reject-promise
结构:
const createDeferred = () => {
const pending = new Rx.AsyncSubject(); // caches last value / error
const end = (result) => {
if (pending.isStopped) {
console.warn('Deferred already resloved/rejected.'); // optionally throw
return;
}
if (result.isValue) {
pending.next(result.value);
pending.complete();
} else {
pending.error(result.error);
}
}
return {
resolve: (value) => end({isValue: true, value: value }),
reject: (error) => end({isValue: false, error: error }),
observable: pending.asObservable() // hide subject
};
}
// sync example
let def = createDeferred();
let obs = def.observable;
obs.subscribe(n => console.log('BEFORE-RESOLVE'));
def.resolve(1);
def.resolve(2); // warn - no action
def.reject('ERROR') // warn - no action
def.observable.subscribe(n => console.log('AFTER-RESOLVE'));
// async example
def = createDeferred();
def.observable.subscribe(() => console.log('ASYNC-BEFORE-RESOLVE'));
setTimeout(() => {
def.resolve(1);
setTimeout(() => {
def.observable.subscribe(() => console.log('ASYNC-AFTER-RESOLVE'));
def.resolve(2); // warn
def.reject('err'); // warn
}, 1000)
}, 1000);
// async error example
const def3 = createDeferred();
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-BEFORE-REJECTED (I will not be called)'),
(err) => console.error('ERROR-BEFORE-REJECTED', err));
setTimeout(() => {
def3.reject('ERR');
setTimeout(() => {
def3.observable.subscribe(
(n) => console.log(n, 'ERROR-AFTER-REJECTED (I will not be called)'),
(err) => console.error('ERROR-AFTER-REJECTED', err));
def3.resolve(2); // warn
def3.reject('err'); // warn
}, 1000)
}, 3000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.9/Rx.umd.js"></script>