ReactiveX 多次过滤 observable 并合并

ReactiveX filtering on observable multiple times and merging

我在创建以下可观察对象时遇到问题。
我希望它接收一个预定义的值数组
我想过滤不同的东西,并能够将它们作为单独的可观察对象来处理。
然后当需要合并这些过滤后的可观察对象时,我想保留原始顺序

//Not sure the share is necessary, just thought it would tie it all together
const input$ = Observable.from([0,1,0,1]).share();
const ones$ = input$.filter(n => n == 1);
const zeroes$ = input$.filter(n => n == 0);

const zeroesChanged$ = zeroes$.mapTo(2);
const onesChanged$ = ones$.mapTo(3);
const allValues$ = Observable.merge(onesChanged$,zeroesChanged$);

 allValues$.subscribe(n => console.log(n));
//Outputs 3,3,2,2
//Expected output 3,2,3,2

编辑:很抱歉我的问题不够具体。 我正在使用一个名为 cycleJS 的库,它将副作用分离到驱动程序中。 所以我在我的周期中做的是这个

export function socketCycle({ SOCKETIO }) {
  const serverConnect$ = SOCKETIO.get('connect').map(serverDidConnect);
  const serverDisconnect$ = SOCKETIO.get('disconnect').map(serverDidDisconnect);
  const serverFailedToConnect$ = SOCKETIO.get('connect_failed').map(serverFailedToConnect);
  return { ACTION: Observable.merge(serverConnect$, serverDisconnect$, serverFailedToConnect$) };
}

现在,当我想为其编写测试时,我的问题出现了。我尝试了以下错误的方法(使用玩笑)

const inputConnect$ = Observable.from(['connect', 'disconnect', 'connect', 'disconnect']).share();
const expectedOutput$ = Observable.from([
  serverDidConnect(),
  serverDidDisconnect(),
  serverDidConnect(),
  serverDidDisconnect(),
]);
const socketIOMock = {
  get: (evt) => {
    if (evt === 'connect') {
      return inputConnect$.filter(s => s === 'connect');
    } else if (evt === 'disconnect') {
      return inputConnect$.filter(s => s === 'disconnect');
    }
    return Observable.empty();
  },
};
const { ACTION } = socketCycle({ SOCKETIO: socketIOMock });
Observable.zip(ACTION, expectedOutput$).subscribe(
  ([output, expectedOutput]) => { expect(output).toEqual(expectedOutput); },
  (error) => { expect(true).toBe(false) },
  () => { done(); },
);

也许我可以用另一种方法来测试它?

下面的代码可能会给你想要的结果,但是不需要使用 rxjs 来操作数组恕我直言

Rx.Observable.combineLatest(
   Rx.Observable.from([0,0,0]),
   Rx.Observable.from([1,1,1])
).flatMap(value=>Rx.Observable.from(value))
.subscribe(console.log)

当流被分区时,不同子流中元素之间的时间保证实际上被破坏了。特别是,即使 connect 事件总是在事件源的 disconnect 事件之前发生,connect Observable 的事件也不会总是在 [=12] 中的相应事件项之前发生=] 可观察。在正常的时间尺度上,这种竞争条件可能非常罕见但仍然很危险,这个测试显示了最坏的情况。

好消息是,所示的函数只是事件和处理程序结果之间的映射器。如果你可以在事件类型上继续这个模型,那么你甚至可以在一个普通的数据结构中编码映射,这有利于表现力:

const event_handlers = new Map({
 'connect': serverDidConnect,
 'disconnect': serverDidDisconnect,
 'connect_failed': serverFailedToConnect
});
const ACTION = input$.map(event_handlers.get.bind(event_handlers));

警告:如果您正在减少子流(或以其他方式考虑以前的值,例如 debounceTime),重构就不是那么简单,并且会也取决于"preserve order"的新定义。大多数时候,使用 reduce + 更复杂的累加器进行重现仍然是可行的。