RxJS 中是否存在与 `race` 运算符相反的运算符?

Is-there an opposite of the `race` operator in RxJS?

我有两个 observable,我想听最后一个发出第一个值的那个,有这个操作符吗?类似的东西:

let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000); // I want the values from this one
let sloth = Rx.Observable.sloth(obs1,obs2);

其中 sloth observable 会发出来自 obs2 的值,因为它是最后发出第一个值的那个。

如果不行,还有其他办法吗?

我暂时看到了这种可能性,但我很好奇是否有人发现了其他任何东西:

let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`);
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`);
let sloth = Rx.Observable.merge(
  obs1.take(1).mapTo(obs1),
  obs2.take(1).mapTo(obs2)
).takeLast(1).mergeAll()

sloth.subscribe(data=>console.log(data))
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>

编辑 正如@user3743222(非常好的昵称:-D)指出的那样,它不适用于热可观察对象,但这应该没问题:

let obs1 = Rx.Observable.timer(500,500).map(i=>`cheetah ${i}`).publish();
let obs2 = Rx.Observable.timer(1000,1000).map(i=>`sloth ${i}`).publish();
obs1.connect();
obs2.connect();
let sloth = Rx.Observable.merge(
  obs1.take(1).map((val)=>obs1.startWith(val)),
  obs2.take(1).map((val)=>obs2.startWith(val))
).takeLast(1).mergeAll();
    
sloth.subscribe(data=>console.log(data));
<script src="https://unpkg.com/@reactivex/rxjs@5.3.0/dist/global/Rx.js"></script>

我喜欢你的解决方案(尽管我怀疑如果你有一个热流你可能永远不会看到第一个发射值 - 如果源是冷的,一切看起来都很好)。你能做一个 jsfiddle 来检查一下吗?如果您没有错过任何价值,那么您的解决方案就是最好的。如果这样做,可能可以通过将跳过的值添加回源来更正它 (obs1.take(1).map(val => obs1.startWith(val)).

否则,对于通用的冗长解决方案,这里的关键是您有状态,因此您还需要 scan 运算符。我们用索引标记源,并保持一个状态,该状态表示已经启动的源的索引。当除一个以外的所有都开始时,我们知道没有开始的那个的索引,我们只从那个开始中选择值。请注意,无论来源是热的还是冷的,这都应该独立工作,因为所有内容都是一次性完成的,即没有多个订阅。

Rx.Observable.merge(
  obs1.map(val => {val, sourceId: 1})
  obs2.map(val => {val, sourceId: 2})
  obsn.map(val => {val, sourceId: n})
).scan( 
(acc, valueStruct) => {
  acc.valueStruct = valueStruct
  acc.alreadyEmitted[valueStruct.sourceId - 1] = true
  if (acc.alreadyEmitted.filter(Boolean).length === n - 1) {
    acc.lastSourceId = 1 + acc.alreadyEmitted.findIndex(element => element === false)
  }
  return acc
}, {alreadyEmitted : new Array(n).fill(false), lastSourceId : 0, valueStruct: null}
)
.map (acc => acc.valueStruct.sourceId === acc.lastSourceId ? acc.valueStruct.val : null )
.filter(Boolean)

也许还有更短的,我不知道。我会尝试将它放在 fiddle 中,看看它是否真的有效,或者如果你这样做之前让我知道。

这个怎么样?

let obs1 = Rx.Observable.timer(500,500);
let obs2 = Rx.Observable.timer(1000,1000);

let sloth = Rx.Observable.race(
    obs1.take(1).concat(obs2),
    obs2.take(1).concat(obs1)
).skip(1);

并且作为具有多个 Observables 支持的函数:

let sloth = (...observables) => 
    observables.length === 1 ?
        observables[0] :
    observables.length === 2 ?
        Rx.Observable.race(
            observables[0].take(1).concat(observables[1]),
            observables[1].take(1).concat(observables[0])
        ).skip(1) :
    observables.reduce((prev, current) => sloth(prev, current))[0];

使用 RxJS 6 和 ReplaySubject:

function lastOf(...observables) {
  const replayable = observables
    .map(o => {
      let r = o.pipe(multicast(new ReplaySubject(1)));
      r.connect();
      return r;
    });
  const racing = replayable
    .map((v, i) => v.pipe(
      take(1),
      mapTo(i),
    ))
    ;
  return of(...racing).pipe(
    mergeAll(),
    reduce((_, val) => val),
    switchMap(i => replayable[i]),
  );
}

使用:

const fast = interval(500);
const medium = interval(1000);
const slow = interval(2000);

lastOf(fast, slow, medium).subscribe(console.log);

我遇到了同样的问题,并且能够使用 mergeskipUntil 的组合来解决它。如果两个结果同时完成,pipe(last()) 会阻止您接收多个结果。

尝试将以下内容粘贴到 https://rxviz.com/ 中:

const { timer, merge } = Rx;
const { mapTo, skipUntil, last } = RxOperators;

let obs1 = timer(500).pipe(mapTo('1'));
let obs2 = timer(1000).pipe(mapTo('2')); // I want the values from this one
let sloth = merge(
    obs1.pipe(skipUntil(obs2)),
    obs2.pipe(skipUntil(obs1))
).pipe(last())

sloth