使用像 merge 这样的 RxJs 运算符,但在结果中跟踪源 observables?
Use RxJs operator like merge but keep track of source observables in the result?
我想像使用“合并”运算符一样合并可观察对象,但我仍然希望能够知道发出了哪个输入可观察对象,有没有办法做到这一点?
例如:
private result$ = merge(this.obs1$, this.obs2$).pipe(
scan((result, change) => index + change, 0),
shareReplay(1)
);
只要任何输入可观察量发出,来自 obs1 和 obs2 的两个值都将进入扫描函数中的“更改”变量,但是如果我可以访问投影函数,我可以在其中标记来自输入的值具有不同名称的 observables 然后我可以在下面的扫描函数中做不同的事情,具体取决于发射的输入 observable。其他运算符(如 CombineLatest 或 ForkJoin)似乎不适用于此处,因为它们需要完成或从所有输入可观察量中发出。
如果您需要跟踪发出了哪个输入可观察对象,那么您可能需要将元数据添加到您的源可观察对象中。在不知道如何使用 result$
的上下文的情况下,这是给定信息的最佳解决方案。
我建议为您需要跟踪的每个可观察对象添加一个 id
属性。然后,您可以根据 ID 在扫描运算符中使用一些策略。
下面是一个简单示例,对每个可观察源使用 id
。在 scan
运算符中,您将看到我的策略如何根据 ID 发生变化。
import { interval, merge, of } from "rxjs";
import { map, scan, shareReplay } from "rxjs/operators";
const obs1$ = interval(1000).pipe(map(i => ({ i, id: "obs1" })));
const obs2$ = interval(3000).pipe(map(i => ({ i, id: "obs2" })));
let index = 0;
const result$ = merge(obs1$, obs2$).pipe(
scan((result, change) => {
if (change.id === "obs1") {
return index + change.i;
}
if (change.id === "obs2") {
return index + change.i * 2;
}
}, 0),
shareReplay(1)
);
result$.subscribe(console.log);
库 @react-rxjs/utils
有 a util named mergeWithKey
可以像这样使用:
import { Subject } from "rxjs"
import { scan, startWith } from 'rxjs/operators'
import { mergeWithKey } from '@react-rxjs/utils'
const inc$ = new Subject()
const dec$ = new Subject()
const resetTo$ = new Subject<number>()
const counter$ = mergeWithKey({
inc$,
dec$,
resetTo$,
}).pipe(
scan((acc, current) => {
switch (current.type) {
case "inc$":
return acc + 1
case "dec$":
return acc - 1
case "resetTo$":
return current.payload
default:
return acc
}
}, 0),
startWith(0),
)
实现非常简单:
import { merge, Observable, ObservableInput, from, SchedulerLike } from "rxjs"
import { map } from "rxjs/operators"
/**
* Emits the values from all the streams of the provided object, in a result
* which provides the key of the stream of that emission.
*
* @param input object of streams
*/
export const mergeWithKey: <
O extends { [P in keyof any]: ObservableInput<any> },
OT extends {
[K in keyof O]: O[K] extends ObservableInput<infer V>
? { type: K; payload: V }
: unknown
}
>(
x: O,
concurrent?: number,
scheduler?: SchedulerLike,
) => Observable<OT[keyof O]> = (input, ...optionalArgs) =>
merge(
...(Object.entries(input)
.map(
([type, stream]) =>
from(stream).pipe(
map((payload) => ({ type, payload } as any)),
) as any,
)
.concat(optionalArgs) as any[]),
)
这是您需要的吗?
我想像使用“合并”运算符一样合并可观察对象,但我仍然希望能够知道发出了哪个输入可观察对象,有没有办法做到这一点?
例如:
private result$ = merge(this.obs1$, this.obs2$).pipe(
scan((result, change) => index + change, 0),
shareReplay(1)
);
只要任何输入可观察量发出,来自 obs1 和 obs2 的两个值都将进入扫描函数中的“更改”变量,但是如果我可以访问投影函数,我可以在其中标记来自输入的值具有不同名称的 observables 然后我可以在下面的扫描函数中做不同的事情,具体取决于发射的输入 observable。其他运算符(如 CombineLatest 或 ForkJoin)似乎不适用于此处,因为它们需要完成或从所有输入可观察量中发出。
如果您需要跟踪发出了哪个输入可观察对象,那么您可能需要将元数据添加到您的源可观察对象中。在不知道如何使用 result$
的上下文的情况下,这是给定信息的最佳解决方案。
我建议为您需要跟踪的每个可观察对象添加一个 id
属性。然后,您可以根据 ID 在扫描运算符中使用一些策略。
下面是一个简单示例,对每个可观察源使用 id
。在 scan
运算符中,您将看到我的策略如何根据 ID 发生变化。
import { interval, merge, of } from "rxjs";
import { map, scan, shareReplay } from "rxjs/operators";
const obs1$ = interval(1000).pipe(map(i => ({ i, id: "obs1" })));
const obs2$ = interval(3000).pipe(map(i => ({ i, id: "obs2" })));
let index = 0;
const result$ = merge(obs1$, obs2$).pipe(
scan((result, change) => {
if (change.id === "obs1") {
return index + change.i;
}
if (change.id === "obs2") {
return index + change.i * 2;
}
}, 0),
shareReplay(1)
);
result$.subscribe(console.log);
库 @react-rxjs/utils
有 a util named mergeWithKey
可以像这样使用:
import { Subject } from "rxjs"
import { scan, startWith } from 'rxjs/operators'
import { mergeWithKey } from '@react-rxjs/utils'
const inc$ = new Subject()
const dec$ = new Subject()
const resetTo$ = new Subject<number>()
const counter$ = mergeWithKey({
inc$,
dec$,
resetTo$,
}).pipe(
scan((acc, current) => {
switch (current.type) {
case "inc$":
return acc + 1
case "dec$":
return acc - 1
case "resetTo$":
return current.payload
default:
return acc
}
}, 0),
startWith(0),
)
实现非常简单:
import { merge, Observable, ObservableInput, from, SchedulerLike } from "rxjs"
import { map } from "rxjs/operators"
/**
* Emits the values from all the streams of the provided object, in a result
* which provides the key of the stream of that emission.
*
* @param input object of streams
*/
export const mergeWithKey: <
O extends { [P in keyof any]: ObservableInput<any> },
OT extends {
[K in keyof O]: O[K] extends ObservableInput<infer V>
? { type: K; payload: V }
: unknown
}
>(
x: O,
concurrent?: number,
scheduler?: SchedulerLike,
) => Observable<OT[keyof O]> = (input, ...optionalArgs) =>
merge(
...(Object.entries(input)
.map(
([type, stream]) =>
from(stream).pipe(
map((payload) => ({ type, payload } as any)),
) as any,
)
.concat(optionalArgs) as any[]),
)
这是您需要的吗?