为什么这些 RxJS.Observable 序列中的第二项被跳过?
why is the second item in these RxJS.Observable sequences getting skipped?
我有以下 RxJS 代码:
let items = Observable.of({
val: "one",
type: "string"
}, {
val: "five",
type: "string"
}, {
val: 2,
type: "integer"
}, {
val: 10,
type: "integer"
}, {
val: 20,
type: "integer"
});
items
.groupBy(x => x.type)
.subscribe(obs => {
let head = obs.first();
let tail = obs.skip(1);
head
.do(x => console.log('head == ', x))
.concat(tail)
.do(x => console.log('tail == ', x))
.subscribe(x => console.log('sub == ', x));
});
这是输出:
head == Object {val: "one", type: "string"}
tail == Object {val: "one", type: "string"}
sub == Object {val: "one", type: "string"}
head == Object {val: 2, type: "integer"}
tail == Object {val: 2, type: "integer"}
sub == Object {val: 2, type: "integer"}
tail == Object {val: 20, type: "integer"}
sub == Object {val: 20, type: "integer"}
为什么第二项没有发出?
有点难以解释这里发生了什么,但没有魔法所以让我们看看我是否能找到这些词(见下文)。在冗长的解释之后是一个建议的解决方法。
假设 observer
是 obs => { let head = obs.first();....
Observable.of(...)
是一个冷可观察对象,它将一个接一个地发出项目...只要观察者订阅(冷可观察对象的定义)。
groupBy
也是一个冷可观察对象,但它创建了热可观察对象,这是您的类型组。这意味着当一个项目进入时,它与一个可观察组相关联,并立即发送给那里的任何观察者(热可观察的定义)。如果没有观察者,则该值丢失。
- 您的第一个
subscribe(observer)
导致以下结果:
observer
订阅 groupBy
,后者订阅 Observable.of
,因此从项目 (val: "one", type: "string"
) 发出第一个项目。 Let-s 称之为 item_string_1
.
groupBy
创建第一组热可观察对象(称之为 obs_string
),将其发送到 observer
,然后在第一组中发送第一项。
- 当
obs_string
被发出时,observer
被执行(记住观察者每次相关的可观察量发出时都会被执行)。这意味着
item_string_1
将在 observer
执行后发出。
- 第一次观察者执行:观察者中的
subscribe
导致以下结果:
observer2
(x => console.log('sub == ', x)
)订阅 concat(tail)
concat(tail)
立即订阅 head
并且此时不订阅 tail
。它只会在 head
完成后订阅它。这是重点之一。
head
导致对 first
的订阅,进而导致对 obs
的订阅,这导致......什么都没有,因为 obs
没有发出任何值。
- 观察者执行结束
- 然后
item_string_1
在 obs_string
上发射。这导致:
head
发出 item_string_1
,concat(tail)
发出 item_string1
并且你所有的 console.log
显示 Object {val: "one", type: "string"}
.
head
完成
- 然后
item_string_2
在 obs_string
上发射。这导致:
concat(tail)
订阅 tail
,tail
订阅 skip(1)
,skip(1)
订阅 obs
(目前为 obs_string
)。这意味着 concat(tail)
发出……什么都没有。它跳过 obs
的第一个值,从它订阅的那一刻起计算的第一个值。第一个值是 item_string_2
,这是发出的第二个值。结论:您的第二个值永远不会发出。
- 然后
item_integer_1
被发射。与 item_string_1
相同的行为
- 然后
item_integer_2
被发射。与 item_string_2
相同的行为(已跳过)
- 然后
item_integer_3
被发射。现在 skip(1)
发出 val: 20, type: "integer"
,这使得它成为 console.log
中的消息
建议的解决方法:
不确定我是否理解得很好,但是如果你想根据元素的索引做一些特定的事情,只需使用 map(selector(value, index))
如下:
items
.groupBy(x => x.type)
.subscribe(obs => obs.map(function(value, index){
return index == 0 ? process_first(value)
: process_others(value)
})
.subscribe(x => console.log('sub == ', x))
我有以下 RxJS 代码:
let items = Observable.of({
val: "one",
type: "string"
}, {
val: "five",
type: "string"
}, {
val: 2,
type: "integer"
}, {
val: 10,
type: "integer"
}, {
val: 20,
type: "integer"
});
items
.groupBy(x => x.type)
.subscribe(obs => {
let head = obs.first();
let tail = obs.skip(1);
head
.do(x => console.log('head == ', x))
.concat(tail)
.do(x => console.log('tail == ', x))
.subscribe(x => console.log('sub == ', x));
});
这是输出:
head == Object {val: "one", type: "string"}
tail == Object {val: "one", type: "string"}
sub == Object {val: "one", type: "string"}
head == Object {val: 2, type: "integer"}
tail == Object {val: 2, type: "integer"}
sub == Object {val: 2, type: "integer"}
tail == Object {val: 20, type: "integer"}
sub == Object {val: 20, type: "integer"}
为什么第二项没有发出?
有点难以解释这里发生了什么,但没有魔法所以让我们看看我是否能找到这些词(见下文)。在冗长的解释之后是一个建议的解决方法。
假设 observer
是 obs => { let head = obs.first();....
Observable.of(...)
是一个冷可观察对象,它将一个接一个地发出项目...只要观察者订阅(冷可观察对象的定义)。groupBy
也是一个冷可观察对象,但它创建了热可观察对象,这是您的类型组。这意味着当一个项目进入时,它与一个可观察组相关联,并立即发送给那里的任何观察者(热可观察的定义)。如果没有观察者,则该值丢失。- 您的第一个
subscribe(observer)
导致以下结果:observer
订阅groupBy
,后者订阅Observable.of
,因此从项目 (val: "one", type: "string"
) 发出第一个项目。 Let-s 称之为item_string_1
.groupBy
创建第一组热可观察对象(称之为obs_string
),将其发送到observer
,然后在第一组中发送第一项。- 当
obs_string
被发出时,observer
被执行(记住观察者每次相关的可观察量发出时都会被执行)。这意味着item_string_1
将在observer
执行后发出。 - 第一次观察者执行:观察者中的
subscribe
导致以下结果:observer2
(x => console.log('sub == ', x)
)订阅concat(tail)
concat(tail)
立即订阅head
并且此时不订阅tail
。它只会在head
完成后订阅它。这是重点之一。head
导致对first
的订阅,进而导致对obs
的订阅,这导致......什么都没有,因为obs
没有发出任何值。- 观察者执行结束
- 然后
item_string_1
在obs_string
上发射。这导致:head
发出item_string_1
,concat(tail)
发出item_string1
并且你所有的console.log
显示Object {val: "one", type: "string"}
.head
完成
- 然后
item_string_2
在obs_string
上发射。这导致:concat(tail)
订阅tail
,tail
订阅skip(1)
,skip(1)
订阅obs
(目前为obs_string
)。这意味着concat(tail)
发出……什么都没有。它跳过obs
的第一个值,从它订阅的那一刻起计算的第一个值。第一个值是item_string_2
,这是发出的第二个值。结论:您的第二个值永远不会发出。
- 然后
item_integer_1
被发射。与item_string_1
相同的行为
- 然后
item_integer_2
被发射。与item_string_2
相同的行为(已跳过) - 然后
item_integer_3
被发射。现在skip(1)
发出val: 20, type: "integer"
,这使得它成为console.log
中的消息
建议的解决方法:
不确定我是否理解得很好,但是如果你想根据元素的索引做一些特定的事情,只需使用 map(selector(value, index))
如下:
items
.groupBy(x => x.type)
.subscribe(obs => obs.map(function(value, index){
return index == 0 ? process_first(value)
: process_others(value)
})
.subscribe(x => console.log('sub == ', x))