使用 #groupBy 创建的 RxJS #zip 组
RxJS #zip groups created using #groupBy
我需要压缩分组的可观察量(以形成相关组的笛卡尔积,但这与问题无关)。
当运行使用下面的代码时,实际上只有子可观察组在#zip 中发出值 - 这是为什么?
https://jsbin.com/coqeqaxoci/edit?js,console
var parent = Rx.Observable.from([1,2,3]).publish();
var child = parent.map(x => x).publish();
var groupedParent = parent.groupBy(x => x);
var groupedChild = child.groupBy(x => x);
Rx.Observable.zip([groupedChild, groupedParent])
.map(groups => {
groups[0].subscribe(x => console.log('zipped child ' + x)); // -> emitting
groups[1].subscribe(x => console.log('zipped parent ' + x)); // -> not emitting
})
.subscribe();
groupedChild.subscribe(group => {
group.subscribe(value => console.log('child ' + value)); // -> emitting
});
groupedParent.subscribe(group => {
group.subscribe(value => console.log('parent ' + value)); // -> emitting
});
child.connect();
parent.connect();
编辑:
正如 user3743222 的回答中所解释的那样,groupBy 发出的组是 hot 并且对父组 (groups[1]) 的订阅发生在第一个值已经发出之后。这是因为 #zip 等待 groupedChild 和 groupedParent 发出,后者发出得更快(意味着它的组在 #zip 函数 运行 之前发出值)。
我修改了你的代码如下:
var countChild = 0, countParent = 0;
function emits ( who ) {
return function ( x ) {console.log(who + " emits : " + x);};
}
function checkCount ( who ) {
return function ( ) {
if (who === "parent") {
countParent++;
}
else {
countChild++;
}
console.log("Check : Parent groups = " + countParent + ", Child groups = " + countChild );
};
}
function check ( who, where ) {
return function ( x ) {
console.log("Check : " + who + " : " + where + " :" + x);
};
}
function completed ( who ) {
return function () { console.log(who + " completed!");};
}
function zipped ( who ) {
return function ( x ) { console.log('zipped ' + who + ' ' + x); };
}
function plus1 ( x ) {
return x + 1;
}
function err () {
console.log('error');
}
var parent = Rx.Observable.from([1, 2, 3, 4, 5, 6])
.do(emits("parent"))
.publish();
var child = parent
.map(function ( x ) {return x;})
.do(emits("child"))
// .publish();
var groupedParent = parent
.groupBy(function ( x ) { return x % 2;}, function ( x ) {return "P" + x;})
.do(checkCount("parent"))
.share();
var groupedChild = child
.groupBy(function ( x ) { return x % 3;}, function (x) {return "C" + x;})
.do(checkCount("child"))
.share();
Rx.Observable.zip([groupedChild, groupedParent])
// .do(function ( x ) { console.log("zip args : " + x);})
.subscribe(function ( groups ) {
groups[0]
.do(function ( x ) { console.log("Child group observable emits : " + x);})
.subscribe(zipped('child'), err, completed('Child Group Observable'));
groups[1]
.do(function ( x ) { console.log("Parent group observable emits : " + x);})
.subscribe(zipped('parent'), err, completed('Parent Group Observable'));
}, err, completed('zip'));
//child.connect();
parent.connect();
这是输出:
"parent emits : 1"
"child emits : 1"
"Check : Parent groups = 0, Child groups = 1"
"Check : Parent groups = 1, Child groups = 1"
"Parent group observable emits : P1"
"zipped parent P1"
"parent emits : 2"
"child emits : 2"
"Check : Parent groups = 1, Child groups = 2"
"Check : Parent groups = 2, Child groups = 2"
"Parent group observable emits : P2"
"zipped parent P2"
"parent emits : 3"
"child emits : 3"
"Check : Parent groups = 2, Child groups = 3"
"Parent group observable emits : P3"
"zipped parent P3"
"parent emits : 4"
"child emits : 4"
"Child group observable emits : C4"
"zipped child C4"
"Parent group observable emits : P4"
"zipped parent P4"
"parent emits : 5"
"child emits : 5"
"Child group observable emits : C5"
"zipped child C5"
"Parent group observable emits : P5"
"zipped parent P5"
"parent emits : 6"
"child emits : 6"
"Parent group observable emits : P6"
"zipped parent P6"
"Child Group Observable completed!"
"Child Group Observable completed!"
"Parent Group Observable completed!"
"Parent Group Observable completed!"
"zip completed!"
这里有两点需要说明:
zip 和分组依据与订阅时刻的行为
- groupBy 在 parent 和 child
中按预期创建可观察对象
使用这些值,您可以在日志中查看 Child
创建三个组,Parent
创建两个
Zip 将等待您作为参数传递的每个来源中的一个值。在您的情况下,这意味着您将订阅 child 和 parent grouped-by 可观察对象,当它们都已发布时。在日志中,只有在 "Check : Parent groups = 1, Child groups = 1"
上匹配数字后,您才会看到 "Parent group observable emits : P1"
。
然后您订阅两个 grouped-by 可观察对象,并记录其中的任何内容。这里的问题是 parent grouped-by observable 有一个值要传递,但是 child 'group-by' observable 是之前创建的并且已经传递了它的值,所以当你事后订阅,您看不到那个值 - 但您会看到下一个值。
因此,[1-3]
中的值将生成 3 个新的 child grouped-by 可观察对象,您将看不到其中任何一个,因为您订阅得太晚了。但是您会在 [4-6]
中看到值。您可以查看日志:"zipped child C4"
等
您将在 parent grouped-by 可观察对象中看到所有值,因为您在它们创建后立即订阅了它们。
连接并发布
我对连接和发布没有完全清楚的了解,但由于您的 child 有 parent 作为来源,您不需要延迟连接到它。如果您连接到 parent,child 将自动开始发出其值。因此我修改了你的代码。
这应该可以回答您的直接问题,但不能回答您最初的笛卡尔积目标。也许您应该将其表述为一个问题,看看人们会给出什么样的答案。
我需要压缩分组的可观察量(以形成相关组的笛卡尔积,但这与问题无关)。
当运行使用下面的代码时,实际上只有子可观察组在#zip 中发出值 - 这是为什么?
https://jsbin.com/coqeqaxoci/edit?js,console
var parent = Rx.Observable.from([1,2,3]).publish();
var child = parent.map(x => x).publish();
var groupedParent = parent.groupBy(x => x);
var groupedChild = child.groupBy(x => x);
Rx.Observable.zip([groupedChild, groupedParent])
.map(groups => {
groups[0].subscribe(x => console.log('zipped child ' + x)); // -> emitting
groups[1].subscribe(x => console.log('zipped parent ' + x)); // -> not emitting
})
.subscribe();
groupedChild.subscribe(group => {
group.subscribe(value => console.log('child ' + value)); // -> emitting
});
groupedParent.subscribe(group => {
group.subscribe(value => console.log('parent ' + value)); // -> emitting
});
child.connect();
parent.connect();
编辑: 正如 user3743222 的回答中所解释的那样,groupBy 发出的组是 hot 并且对父组 (groups[1]) 的订阅发生在第一个值已经发出之后。这是因为 #zip 等待 groupedChild 和 groupedParent 发出,后者发出得更快(意味着它的组在 #zip 函数 运行 之前发出值)。
我修改了你的代码如下:
var countChild = 0, countParent = 0;
function emits ( who ) {
return function ( x ) {console.log(who + " emits : " + x);};
}
function checkCount ( who ) {
return function ( ) {
if (who === "parent") {
countParent++;
}
else {
countChild++;
}
console.log("Check : Parent groups = " + countParent + ", Child groups = " + countChild );
};
}
function check ( who, where ) {
return function ( x ) {
console.log("Check : " + who + " : " + where + " :" + x);
};
}
function completed ( who ) {
return function () { console.log(who + " completed!");};
}
function zipped ( who ) {
return function ( x ) { console.log('zipped ' + who + ' ' + x); };
}
function plus1 ( x ) {
return x + 1;
}
function err () {
console.log('error');
}
var parent = Rx.Observable.from([1, 2, 3, 4, 5, 6])
.do(emits("parent"))
.publish();
var child = parent
.map(function ( x ) {return x;})
.do(emits("child"))
// .publish();
var groupedParent = parent
.groupBy(function ( x ) { return x % 2;}, function ( x ) {return "P" + x;})
.do(checkCount("parent"))
.share();
var groupedChild = child
.groupBy(function ( x ) { return x % 3;}, function (x) {return "C" + x;})
.do(checkCount("child"))
.share();
Rx.Observable.zip([groupedChild, groupedParent])
// .do(function ( x ) { console.log("zip args : " + x);})
.subscribe(function ( groups ) {
groups[0]
.do(function ( x ) { console.log("Child group observable emits : " + x);})
.subscribe(zipped('child'), err, completed('Child Group Observable'));
groups[1]
.do(function ( x ) { console.log("Parent group observable emits : " + x);})
.subscribe(zipped('parent'), err, completed('Parent Group Observable'));
}, err, completed('zip'));
//child.connect();
parent.connect();
这是输出:
"parent emits : 1"
"child emits : 1"
"Check : Parent groups = 0, Child groups = 1"
"Check : Parent groups = 1, Child groups = 1"
"Parent group observable emits : P1"
"zipped parent P1"
"parent emits : 2"
"child emits : 2"
"Check : Parent groups = 1, Child groups = 2"
"Check : Parent groups = 2, Child groups = 2"
"Parent group observable emits : P2"
"zipped parent P2"
"parent emits : 3"
"child emits : 3"
"Check : Parent groups = 2, Child groups = 3"
"Parent group observable emits : P3"
"zipped parent P3"
"parent emits : 4"
"child emits : 4"
"Child group observable emits : C4"
"zipped child C4"
"Parent group observable emits : P4"
"zipped parent P4"
"parent emits : 5"
"child emits : 5"
"Child group observable emits : C5"
"zipped child C5"
"Parent group observable emits : P5"
"zipped parent P5"
"parent emits : 6"
"child emits : 6"
"Parent group observable emits : P6"
"zipped parent P6"
"Child Group Observable completed!"
"Child Group Observable completed!"
"Parent Group Observable completed!"
"Parent Group Observable completed!"
"zip completed!"
这里有两点需要说明:
zip 和分组依据与订阅时刻的行为
- groupBy 在 parent 和 child 中按预期创建可观察对象
使用这些值,您可以在日志中查看
Child
创建三个组,Parent
创建两个Zip 将等待您作为参数传递的每个来源中的一个值。在您的情况下,这意味着您将订阅 child 和 parent grouped-by 可观察对象,当它们都已发布时。在日志中,只有在
"Check : Parent groups = 1, Child groups = 1"
上匹配数字后,您才会看到"Parent group observable emits : P1"
。然后您订阅两个 grouped-by 可观察对象,并记录其中的任何内容。这里的问题是 parent grouped-by observable 有一个值要传递,但是 child 'group-by' observable 是之前创建的并且已经传递了它的值,所以当你事后订阅,您看不到那个值 - 但您会看到下一个值。
因此,
[1-3]
中的值将生成 3 个新的 child grouped-by 可观察对象,您将看不到其中任何一个,因为您订阅得太晚了。但是您会在[4-6]
中看到值。您可以查看日志:"zipped child C4"
等您将在 parent grouped-by 可观察对象中看到所有值,因为您在它们创建后立即订阅了它们。
连接并发布
我对连接和发布没有完全清楚的了解,但由于您的 child 有 parent 作为来源,您不需要延迟连接到它。如果您连接到 parent,child 将自动开始发出其值。因此我修改了你的代码。
这应该可以回答您的直接问题,但不能回答您最初的笛卡尔积目标。也许您应该将其表述为一个问题,看看人们会给出什么样的答案。