包装一个 Observable。在每个发出的值之前和之后做一些事情
Wrap an Observable. Do something before and after each emitted value
我想构建一个包装器 class,它在 Observable 的每个发射值之前和之后做一些事情。
这是我想出的:
class Wrapper<T> {
wrapped$: Observable<T>;
_dataSubject = new Subject<T>();
data$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE"),
//
// map( ??? )
//
);
constructor(wrapped$: Observable<T>) {
this.wrapped$ = wrapped$.pipe(
tap(_ => console.log("AFTER")
);
}
}
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")
控制台输出应该是:
BEFORE
foo
AFTER
我不知道如何将 $wrapped
Observable 与 _dataSubject
连接起来。
但也许我完全错了,它需要不同的方法。
这样的事情怎么样
import {Observable} from 'rxjs';
export class DoBeforeAfter<T> {
wrapped$: Observable<T>;
constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
this.wrapped$ = Observable.of(null)
.do(_ => console.log("BEFORE"))
.switchMap(_ => wrapped$)
.do(doFunction)
.do(_ => console.log('AFTER'));
}
}
这样吃
const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));
beforeAfter.wrapped$.subscribe(
null,
error => console.error(error),
() => console.log('DONE')
)
看起来有点麻烦,但也许能帮上忙
最好的方法(尽管很复杂)是创建一个新的运算符,类似于 tap
,但它在发出值之前和之后做一些事情。
您可以在示例中看到它的工作原理(它在 ES6 中,因为 SO 代码片段不接受 TypeScript,但您会明白的)
function wrap(before, after) {
return function wrapOperatorFunction(source) {
return source.lift(new WrapOperator(before, after));
};
}
class WrapOperator {
constructor(before, after) {
this.before = before;
this.after = after;
}
call(subscriber, source) {
return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
}
}
class WrapSubscriber extends Rx.Subscriber {
constructor(destination, before, after) {
super(destination);
this.before = before;
this.after = after;
}
_next(value) {
this.before ? this.before(value) : null;
this.destination.next(value);
this.after ? this.after(value) : null;
}
}
// Now:
const observable = Rx.Observable.from([1, 2, 3, 4]);
observable.pipe(
wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:
observable.pipe(
wrap(value => console.log('BEFORE REQUEST', value)),
Rx.operators.flatMap(value => {
const subject = new Rx.Subject();
setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
return subject;
}),
wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>
如前所述,可能有点复杂,但它与 RxJS 运算符无缝集成,并且它始终是了解如何创建我们自己的运算符的好例子:-)
你评论里说的,你可以看看最后一个例子。在那里,我组合了两个 wrap
运算符。第一个仅使用 before
回调,因此它仅在发出值之前执行某些操作。如您所见,因为源 observable 来自数组,所以四个 before
回调会立即执行。然后,我们应用 flatMap
。我们对其应用了一个新的 wrap
,但这次只有 after
回调。因此,此回调仅在 flatMap
返回的 observable 产生它们的值后调用。
当然,如果取而代之的是来自数组的可观察对象,那么您将拥有一个由事件侦听器创建的对象,您将拥有:
before
回调在触发的事件被 observable 推送之前执行。
- 异步可观察对象执行。
- 异步可观察对象在时间 t 后产生值。
- 执行
after
回调。
这就是让运算符得到回报的地方,因为它们很容易组合。希望这适合你。
因此,据我了解,您可以这样做:
class Wrapper<T> {
_dataSubject: Subject<T>;
wrapped$: Observable<T>;
constructor(wrapped$: Subject<T>) {
this._dataSubject = wrapped$;
this.wrapped$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE")),
tap(data => console.log(data)),
tap(_ => console.log("AFTER"))
);
}
}
然后:
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")
感谢您的参与!
我现在想出了自己的解决方案。 AFTER
和 DATA
输出顺序错误,但我发现这并不重要,只要它们同时出现
代码仍然需要一些重构,但至少现在可以用了!
class Wrapper {
_taskDoneSubject = new Subject<boolean>();
taskDone$ = this._taskDoneSubject.asObservable();
wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
return switchMap<T, R>(val => { // can also be done using mergeMap
this._taskDoneSubject.next(false);
return wrapped(val).pipe(
tap(_ => this._taskDoneSubject.next(true))
);
});
}
}
用法:
test(valueEmittingObservable: Observable<string>) {
let wrapper = new Wrapper();
wrapper.taskDone$.subscribe(val => console.log("task done? ", val);
valueEmittingObservable.pipe(
// wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
).subscribe(val => console.log("emitted value: ", val);
}
输出:
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
或者如果发射速度超过 5 秒:
task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
如果您想在没有 class 的情况下使用 RxJs 6 执行此操作:
return of(null)
.pipe(tap(_ => {
console.log('BEFORE');
}))
.pipe(switchMap(_ => wrappedObservable))
.pipe(tap(_ => {
console.log('AFTER');
}))
我想构建一个包装器 class,它在 Observable 的每个发射值之前和之后做一些事情。
这是我想出的:
class Wrapper<T> {
wrapped$: Observable<T>;
_dataSubject = new Subject<T>();
data$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE"),
//
// map( ??? )
//
);
constructor(wrapped$: Observable<T>) {
this.wrapped$ = wrapped$.pipe(
tap(_ => console.log("AFTER")
);
}
}
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.data$.subscribe(val => console.log(val));
subject.next("foo")
控制台输出应该是:
BEFORE
foo
AFTER
我不知道如何将 $wrapped
Observable 与 _dataSubject
连接起来。
但也许我完全错了,它需要不同的方法。
这样的事情怎么样
import {Observable} from 'rxjs';
export class DoBeforeAfter<T> {
wrapped$: Observable<T>;
constructor(wrapped$: Observable<T>, doFunction: (data: any) => void) {
this.wrapped$ = Observable.of(null)
.do(_ => console.log("BEFORE"))
.switchMap(_ => wrapped$)
.do(doFunction)
.do(_ => console.log('AFTER'));
}
}
这样吃
const source = Observable.of('NOW');
const beforeAfter = new DoBeforeAfter(source, data => console.log(data));
beforeAfter.wrapped$.subscribe(
null,
error => console.error(error),
() => console.log('DONE')
)
看起来有点麻烦,但也许能帮上忙
最好的方法(尽管很复杂)是创建一个新的运算符,类似于 tap
,但它在发出值之前和之后做一些事情。
您可以在示例中看到它的工作原理(它在 ES6 中,因为 SO 代码片段不接受 TypeScript,但您会明白的)
function wrap(before, after) {
return function wrapOperatorFunction(source) {
return source.lift(new WrapOperator(before, after));
};
}
class WrapOperator {
constructor(before, after) {
this.before = before;
this.after = after;
}
call(subscriber, source) {
return source.subscribe(new WrapSubscriber(subscriber, this.before, this.after));
}
}
class WrapSubscriber extends Rx.Subscriber {
constructor(destination, before, after) {
super(destination);
this.before = before;
this.after = after;
}
_next(value) {
this.before ? this.before(value) : null;
this.destination.next(value);
this.after ? this.after(value) : null;
}
}
// Now:
const observable = Rx.Observable.from([1, 2, 3, 4]);
observable.pipe(
wrap(value => console.log('before', value), value => console.log('after', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
// For what you want:
// let's simulate that, for each value in the array, we'll fetch something from an external service:
// we want the before to be executed when we make the request, and the after to be executed when it finishes. In this // case, we just combine two wrap operators and flatMap, for example:
observable.pipe(
wrap(value => console.log('BEFORE REQUEST', value)),
Rx.operators.flatMap(value => {
const subject = new Rx.Subject();
setTimeout(() => { subject.next(value); subject.complete(); }, 5000);
return subject;
}),
wrap(undefined, value => console.log('AFTER REQUEST', value))
).subscribe(value => console.log('value emitted', value), null, () => console.log('complete'));
<script src="https://unpkg.com/@reactivex/rxjs@5.5.0/dist/global/Rx.js"></script>
如前所述,可能有点复杂,但它与 RxJS 运算符无缝集成,并且它始终是了解如何创建我们自己的运算符的好例子:-)
你评论里说的,你可以看看最后一个例子。在那里,我组合了两个 wrap
运算符。第一个仅使用 before
回调,因此它仅在发出值之前执行某些操作。如您所见,因为源 observable 来自数组,所以四个 before
回调会立即执行。然后,我们应用 flatMap
。我们对其应用了一个新的 wrap
,但这次只有 after
回调。因此,此回调仅在 flatMap
返回的 observable 产生它们的值后调用。
当然,如果取而代之的是来自数组的可观察对象,那么您将拥有一个由事件侦听器创建的对象,您将拥有:
before
回调在触发的事件被 observable 推送之前执行。- 异步可观察对象执行。
- 异步可观察对象在时间 t 后产生值。
- 执行
after
回调。
这就是让运算符得到回报的地方,因为它们很容易组合。希望这适合你。
因此,据我了解,您可以这样做:
class Wrapper<T> {
_dataSubject: Subject<T>;
wrapped$: Observable<T>;
constructor(wrapped$: Subject<T>) {
this._dataSubject = wrapped$;
this.wrapped$ = this._dataSubject.pipe(
tap(_ => console.log("BEFORE")),
tap(data => console.log(data)),
tap(_ => console.log("AFTER"))
);
}
}
然后:
let subject = new Subject<string>();
let wrapper = new Wrapper(subject);
wrapper.wrapped$.subscribe();
subject.next("foo")
感谢您的参与!
我现在想出了自己的解决方案。 AFTER
和 DATA
输出顺序错误,但我发现这并不重要,只要它们同时出现
代码仍然需要一些重构,但至少现在可以用了!
class Wrapper {
_taskDoneSubject = new Subject<boolean>();
taskDone$ = this._taskDoneSubject.asObservable();
wrappedSwitchMap<T, R>(wrapped: (val: T) => Observable<R>): OperatorFunction<T, R> {
return switchMap<T, R>(val => { // can also be done using mergeMap
this._taskDoneSubject.next(false);
return wrapped(val).pipe(
tap(_ => this._taskDoneSubject.next(true))
);
});
}
}
用法:
test(valueEmittingObservable: Observable<string>) {
let wrapper = new Wrapper();
wrapper.taskDone$.subscribe(val => console.log("task done? ", val);
valueEmittingObservable.pipe(
// wrapper.wrappedSwitchMap(val => anObservableThatTakesAWhile(val))
wrapper.wrappedSwitchMap(val => of("foo").pipe(delay(5000)) // simulated
).subscribe(val => console.log("emitted value: ", val);
}
输出:
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
或者如果发射速度超过 5 秒:
task done? false
(... 1 second until next emit ...)
task done? false
(... 5 seconds delay ...)
task done? true
emitted value: foo
如果您想在没有 class 的情况下使用 RxJs 6 执行此操作:
return of(null)
.pipe(tap(_ => {
console.log('BEFORE');
}))
.pipe(switchMap(_ => wrappedObservable))
.pipe(tap(_ => {
console.log('AFTER');
}))