switchMap 在 RxJs 中被 属性 区分
switchMap distincted by property in RxJs
比方说,我有一系列动作。每个动作都分配了一些 id。像这样:
const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });
现在,对于每个操作,我想在 switchMap 中执行一些逻辑:
actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);
问题是每个发出的动作都会取消之前的 'some cancellable logic'。
是否可以根据操作 id 取消 'some cancellable logic',最好是操作员?类似于:
actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)
本质上,switchMap的当前行为:
1. actions$ 发出 id #1。 switchMap 订阅了嵌套的 observable。
2. actions$ 发出 id #2。 switchMap 取消订阅先前嵌套的可观察对象。订阅新的。
3. actions$ 发出 id #1。 switchMap 再次取消订阅先前嵌套的可观察对象。订阅新的。
预期行为:
1. actions$ 发出 id #1。 switchMap 订阅了嵌套的 observable。
2. actions$ 发出 id #2。 switchMap 再次订阅嵌套的 observable(这次是#2)。 这里是区别,它不会取消#1中的那个。
3. actions$ 发出 id #1。 switchMap 取消订阅 #1 的嵌套可观察对象。再次订阅 #1。
在switchMap之前使用filter操作来排除取消的id,像这样
of({ id: 1 }, { id: 2 }, { id: 1 }).pipe(
filter(id => ![1,2].includes(id)), // to exclude ids (1,2)
switchMap(id => /*some cancellable logic */ )
).subscribe(...)
这似乎是 mergeMap 运算符的一个用例。 switchMap 的用例是只维护一个内部订阅并取消以前的订阅,这不是你想要的。您想要多个内部订阅,并且希望它们在同一 ID 的新值出现时取消,因此实现一些自定义逻辑来做到这一点。
大致如下:
action$.pipe(
mergeMap(val => {
return (/* your transform logic here */)
.pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
})
)
您可以通过编写自定义运算符将它变成可重用的东西:
import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';
export function switchMapBy<T, R>(
key: keyof T,
mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
return input$ => input$.pipe(
mergeMap(val =>
from(mapFn(val)).pipe(
takeUntil(input$.pipe(filter(i => i[key] === val[key])))
)
)
);
}
并像这样使用它:
action$.pipe(
switchMapBy('id', (val) => /* your transform logic here */)
);
这是它的闪电战:https://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts
比方说,我有一系列动作。每个动作都分配了一些 id。像这样:
const actions$ = of({ id: 1 }, { id: 2 }, { id: 1 });
现在,对于每个操作,我想在 switchMap 中执行一些逻辑:
actions$.pipe(switchMap(a => /* some cancellable logic */)).subscribe(...);
问题是每个发出的动作都会取消之前的 'some cancellable logic'。
是否可以根据操作 id 取消 'some cancellable logic',最好是操作员?类似于:
actions$.pipe(switchMapBy('id', a => /*some cancellable logic */)).subscribe(...)
本质上,switchMap的当前行为:
1. actions$ 发出 id #1。 switchMap 订阅了嵌套的 observable。
2. actions$ 发出 id #2。 switchMap 取消订阅先前嵌套的可观察对象。订阅新的。
3. actions$ 发出 id #1。 switchMap 再次取消订阅先前嵌套的可观察对象。订阅新的。
预期行为:
1. actions$ 发出 id #1。 switchMap 订阅了嵌套的 observable。
2. actions$ 发出 id #2。 switchMap 再次订阅嵌套的 observable(这次是#2)。 这里是区别,它不会取消#1中的那个。
3. actions$ 发出 id #1。 switchMap 取消订阅 #1 的嵌套可观察对象。再次订阅 #1。
在switchMap之前使用filter操作来排除取消的id,像这样
of({ id: 1 }, { id: 2 }, { id: 1 }).pipe(
filter(id => ![1,2].includes(id)), // to exclude ids (1,2)
switchMap(id => /*some cancellable logic */ )
).subscribe(...)
这似乎是 mergeMap 运算符的一个用例。 switchMap 的用例是只维护一个内部订阅并取消以前的订阅,这不是你想要的。您想要多个内部订阅,并且希望它们在同一 ID 的新值出现时取消,因此实现一些自定义逻辑来做到这一点。
大致如下:
action$.pipe(
mergeMap(val => {
return (/* your transform logic here */)
.pipe(takeUntil(action$.pipe(filter(a => a.id === val.id)))); // cancel it when the same id comes back through, put this operator at the correct point in the chain
})
)
您可以通过编写自定义运算符将它变成可重用的东西:
import { OperatorFunction, Observable, from } from 'rxjs';
import { takeUntil, filter, mergeMap } from 'rxjs/operators';
export function switchMapBy<T, R>(
key: keyof T,
mapFn: (val: T) => Observable<R> | Promise<R>
): OperatorFunction<T, R> {
return input$ => input$.pipe(
mergeMap(val =>
from(mapFn(val)).pipe(
takeUntil(input$.pipe(filter(i => i[key] === val[key])))
)
)
);
}
并像这样使用它:
action$.pipe(
switchMapBy('id', (val) => /* your transform logic here */)
);
这是它的闪电战:https://stackblitz.com/edit/rxjs-x1g4vc?file=index.ts