RxJS 中是否存在与 `race` 运算符相反的运算符?
Is-there an opposite of the `race` operator in RxJS?
我有两个 observable,我想听最后一个发出第一个值的那个,有这个操作符吗?类似的东西:
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);
其中 sloth
observable 会发出来自 obs2
的值,因为它是最后发出第一个值的那个。
如果不行,还有其他办法吗?
我暂时看到了这种可能性,但我很好奇是否有人发现了其他任何东西:
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`);
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`);
let sloth = Rx.Observable.merge(
obs1.take(1).mapTo(obs1),
obs2.take(1).mapTo(obs2)
).takeLast(1).mergeAll()
sloth.subscribe(data=>console.log(data))
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>
编辑 正如@user3743222(非常好的昵称:-D)指出的那样,它不适用于热可观察对象,但这应该没问题:
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish();
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish();
obs1.connect();
obs2.connect();
let sloth = Rx.Observable.merge(
obs1.take(1).map((val)=>obs1.startWith(val)),
obs2.take(1).map((val)=>obs2.startWith(val))
).takeLast(1).mergeAll();
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>
我喜欢你的解决方案(尽管我怀疑如果你有一个热流你可能永远不会看到第一个发射值 - 如果源是冷的,一切看起来都很好)。你能做一个 jsfiddle
来检查一下吗?如果您没有错过任何价值,那么您的解决方案就是最好的。如果这样做,可能可以通过将跳过的值添加回源来更正它 (obs1.take(1).map(val => obs1.startWith(val))
.
否则,对于通用的冗长解决方案,这里的关键是您有状态,因此您还需要 scan
运算符。我们用索引标记源,并保持一个状态,该状态表示已经启动的源的索引。当除一个以外的所有都开始时,我们知道没有开始的那个的索引,我们只从那个开始中选择值。请注意,无论来源是热的还是冷的,这都应该独立工作,因为所有内容都是一次性完成的,即没有多个订阅。
Rx.Observable.merge(
obs1.map(val => {val, sourceId: 1})
obs2.map(val => {val, sourceId: 2})
obsn.map(val => {val, sourceId: n})
).scan(
(acc, valueStruct) => {
acc.valueStruct = valueStruct
acc.alreadyEmitted[valueStruct.sourceId - 1] = true
if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
}
return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
)
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null )
.filter(Boolean)
也许还有更短的,我不知道。我会尝试将它放在 fiddle 中,看看它是否真的有效,或者如果你这样做之前让我知道。
这个怎么样?
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000);
let sloth = Rx.Observable.race(
obs1.take(1).concat(obs2),
obs2.take(1).concat(obs1)
).skip(1);
并且作为具有多个 Observables 支持的函数:
let sloth = (...observables) =>
observables.length === 1 ?
observables[0] :
observables.length === 2 ?
Rx.Observable.race(
observables[0].take(1).concat(observables[1]),
observables[1].take(1).concat(observables[0])
).skip(1) :
observables.reduce((prev, current) => sloth(prev, current))[0];
使用 RxJS 6 和 ReplaySubject:
function lastOf(...observables) {
const replayable = observables
.map(o => {
let r = o.pipe(multicast(new ReplaySubject(1)));
r.connect();
return r;
});
const racing = replayable
.map((v, i) => v.pipe(
take(1),
mapTo(i),
))
;
return of(...racing).pipe(
mergeAll(),
reduce((_, val) => val),
switchMap(i => replayable[i]),
);
}
使用:
const fast = interval(500);
const medium = interval(1000);
const slow = interval(2000);
lastOf(fast, slow, medium).subscribe(console.log);
我遇到了同样的问题,并且能够使用 merge
和 skipUntil
的组合来解决它。如果两个结果同时完成,pipe(last())
会阻止您接收多个结果。
尝试将以下内容粘贴到 https://rxviz.com/ 中:
const { timer, merge } = Rx;
const { mapTo, skipUntil, last } = RxOperators;
let obs1 = timer(500).pipe(mapTo('1'));
let obs2 = timer(1000).pipe(mapTo('2')); // I want the values from this one
let sloth = merge(
obs1.pipe(skipUntil(obs2)),
obs2.pipe(skipUntil(obs1))
).pipe(last())
sloth
我有两个 observable,我想听最后一个发出第一个值的那个,有这个操作符吗?类似的东西:
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);
其中 sloth
observable 会发出来自 obs2
的值,因为它是最后发出第一个值的那个。
如果不行,还有其他办法吗?
我暂时看到了这种可能性,但我很好奇是否有人发现了其他任何东西:
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`);
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`);
let sloth = Rx.Observable.merge(
obs1.take(1).mapTo(obs1),
obs2.take(1).mapTo(obs2)
).takeLast(1).mergeAll()
sloth.subscribe(data=>console.log(data))
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>
编辑 正如@user3743222(非常好的昵称:-D)指出的那样,它不适用于热可观察对象,但这应该没问题:
let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish();
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish();
obs1.connect();
obs2.connect();
let sloth = Rx.Observable.merge(
obs1.take(1).map((val)=>obs1.startWith(val)),
obs2.take(1).map((val)=>obs2.startWith(val))
).takeLast(1).mergeAll();
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>
我喜欢你的解决方案(尽管我怀疑如果你有一个热流你可能永远不会看到第一个发射值 - 如果源是冷的,一切看起来都很好)。你能做一个 jsfiddle
来检查一下吗?如果您没有错过任何价值,那么您的解决方案就是最好的。如果这样做,可能可以通过将跳过的值添加回源来更正它 (obs1.take(1).map(val => obs1.startWith(val))
.
否则,对于通用的冗长解决方案,这里的关键是您有状态,因此您还需要 scan
运算符。我们用索引标记源,并保持一个状态,该状态表示已经启动的源的索引。当除一个以外的所有都开始时,我们知道没有开始的那个的索引,我们只从那个开始中选择值。请注意,无论来源是热的还是冷的,这都应该独立工作,因为所有内容都是一次性完成的,即没有多个订阅。
Rx.Observable.merge(
obs1.map(val => {val, sourceId: 1})
obs2.map(val => {val, sourceId: 2})
obsn.map(val => {val, sourceId: n})
).scan(
(acc, valueStruct) => {
acc.valueStruct = valueStruct
acc.alreadyEmitted[valueStruct.sourceId - 1] = true
if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
}
return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
)
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null )
.filter(Boolean)
也许还有更短的,我不知道。我会尝试将它放在 fiddle 中,看看它是否真的有效,或者如果你这样做之前让我知道。
这个怎么样?
let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000);
let sloth = Rx.Observable.race(
obs1.take(1).concat(obs2),
obs2.take(1).concat(obs1)
).skip(1);
并且作为具有多个 Observables 支持的函数:
let sloth = (...observables) =>
observables.length === 1 ?
observables[0] :
observables.length === 2 ?
Rx.Observable.race(
observables[0].take(1).concat(observables[1]),
observables[1].take(1).concat(observables[0])
).skip(1) :
observables.reduce((prev, current) => sloth(prev, current))[0];
使用 RxJS 6 和 ReplaySubject:
function lastOf(...observables) {
const replayable = observables
.map(o => {
let r = o.pipe(multicast(new ReplaySubject(1)));
r.connect();
return r;
});
const racing = replayable
.map((v, i) => v.pipe(
take(1),
mapTo(i),
))
;
return of(...racing).pipe(
mergeAll(),
reduce((_, val) => val),
switchMap(i => replayable[i]),
);
}
使用:
const fast = interval(500);
const medium = interval(1000);
const slow = interval(2000);
lastOf(fast, slow, medium).subscribe(console.log);
我遇到了同样的问题,并且能够使用 merge
和 skipUntil
的组合来解决它。如果两个结果同时完成,pipe(last())
会阻止您接收多个结果。
尝试将以下内容粘贴到 https://rxviz.com/ 中:
const { timer, merge } = Rx;
const { mapTo, skipUntil, last } = RxOperators;
let obs1 = timer(500).pipe(mapTo('1'));
let obs2 = timer(1000).pipe(mapTo('2')); // I want the values from this one
let sloth = merge(
obs1.pipe(skipUntil(obs2)),
obs2.pipe(skipUntil(obs1))
).pipe(last())
sloth