为什么这些 RxJS.Observable 序列中的第二项被跳过?

why is the second item in these RxJS.Observable sequences getting skipped?

我有以下 RxJS 代码:

let items = Observable.of({
  val: "one",
  type: "string"
}, {
  val: "five",
  type: "string"
}, {
  val: 2,
  type: "integer"
}, {
  val: 10,
  type: "integer"
}, {
  val: 20,
  type: "integer"
});

items
  .groupBy(x => x.type)
  .subscribe(obs => {
    let head = obs.first();
    let tail = obs.skip(1);
    head
      .do(x => console.log('head == ', x))
      .concat(tail)
      .do(x => console.log('tail == ', x))
      .subscribe(x => console.log('sub == ', x));

  });

这是输出:

head ==  Object {val: "one", type: "string"}
tail ==  Object {val: "one", type: "string"}
sub ==  Object {val: "one", type: "string"}

head ==  Object {val: 2, type: "integer"}
tail ==  Object {val: 2, type: "integer"}
sub ==  Object {val: 2, type: "integer"}
tail ==  Object {val: 20, type: "integer"}
sub ==  Object {val: 20, type: "integer"}

为什么第二项没有发出?

这里是 Plunk that demonstrates my problem

有点难以解释这里发生了什么,但没有魔法所以让我们看看我是否能找到这些词(见下文)。在冗长的解释之后是一个建议的解决方法。

假设 observerobs => { let head = obs.first();....

  • Observable.of(...) 是一个冷可观察对象,它将一个接一个地发出项目...只要观察者订阅(冷可观察对象的定义)。
  • groupBy 也是一个冷可观察对象,但它创建了热可观察对象,这是您的类型组。这意味着当一个项目进入时,它与一个可观察组相关联,并立即发送给那里的任何观察者(热可观察的定义)。如果没有观察者,则该值丢失。
  • 您的第一个 subscribe(observer) 导致以下结果:
    • observer 订阅 groupBy,后者订阅 Observable.of,因此从项目 (val: "one", type: "string") 发出第一个项目。 Let-s 称之为 item_string_1.
    • groupBy 创建第一组热可观察对象(称之为 obs_string),将其发送到 observer,然后在第一组中发送第一项。
    • obs_string 被发出时,observer 被执行(记住观察者每次相关的可观察量发出时都会被执行)。这意味着 item_string_1 将在 observer 执行后发出。
    • 第一次观察者执行:观察者中的 subscribe 导致以下结果:
      • observer2x => console.log('sub == ', x))订阅 concat(tail)
      • concat(tail) 立即订阅 head 并且此时不订阅 tail。它只会在 head 完成后订阅它。这是重点之一。
      • head 导致对 first 的订阅,进而导致对 obs 的订阅,这导致......什么都没有,因为 obs 没有发出任何值。
      • 观察者执行结束
    • 然后 item_string_1obs_string 上发射。这导致:
      • head 发出 item_string_1concat(tail) 发出 item_string1 并且你所有的 console.log 显示 Object {val: "one", type: "string"}.
      • head 完成
    • 然后 item_string_2obs_string 上发射。这导致:
      • concat(tail) 订阅 tailtail 订阅 skip(1)skip(1) 订阅 obs(目前为 obs_string)。这意味着 concat(tail) 发出……什么都没有。它跳过 obs 的第一个值,从它订阅的那一刻起计算的第一个值。第一个值是 item_string_2 ,这是发出的第二个值。结论:您的第二个值永远不会发出。
    • 然后 item_integer_1 被发射。与 item_string_1
    • 相同的行为
    • 然后 item_integer_2 被发射。与 item_string_2 相同的行为(已跳过)
    • 然后 item_integer_3 被发射。现在 skip(1) 发出 val: 20, type: "integer" ,这使得它成为 console.log
    • 中的消息

建议的解决方法:

不确定我是否理解得很好,但是如果你想根据元素的索引做一些特定的事情,只需使用 map(selector(value, index)) 如下:

items
  .groupBy(x => x.type)
  .subscribe(obs => obs.map(function(value, index){
                               return index == 0 ? process_first(value)
                                                 : process_others(value)
                            })
                       .subscribe(x => console.log('sub == ', x))