RxJS 连接两个可观察对象
RxJS concat two observables
我正在尝试连接两个 Obs1 和 Obs2。 Obs1 发出 5,6,7。 Obs2 发出 1,2,3,4。我正在连接 -> concat(Obs2, Obs1)
我的订阅者期望 1,2,3,4,5,6,7,但只得到 1,2,3,4。我做错了什么?
let Obs1 = new rxjs.Subject();
let Obs2 = new rxjs.Subject();
function sendToObs1(x){
Obs1.next(x)
}
async function sendToObs2(){
let trns = await getValues();
for(let i = 0; i < trns.length; i++){
Obs2.next(trns[i])
}
Obs2.complete()
}
function getValues(){
return new Promise((resolve, reject) => {
setTimeout(() => resolve([1,2,3,4]), 10)
})
};
rxjs.concat(Obs2, Obs1).subscribe({
next: x=> console.log("Received: " + x),
complete: () => console.log("Done")}
)
sendToObs2()
sendToObs1(5)
sendToObs1(6)
sendToObs1(7)
//Output
// Received: 1
// Received: 2
// Received: 3
// Received: 4
//Expected
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Received: 6
// Received: 7
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.js"></script>
Concat只在Obs2的onComplete之后订阅
let Obs1 = new rxjs.ReplaySubject();
let Obs2 = new rxjs.Subject();
function sendToObs1(x){
Obs1.next(x)
}
async function sendToObs2(){
let trns = await getValues();
for(let i = 0; i < trns.length; i++){
Obs2.next(trns[i])
}
Obs2.complete()
}
function getValues(){
return new Promise((resolve, reject) => {
setTimeout(() => resolve([1,2,3,4]), 10)
})
};
rxjs.concat(Obs2, Obs1).subscribe({
next: x=> console.log("Received: " + x),
complete: () => console.log("Done")}
)
sendToObs2()
sendToObs1(5)
sendToObs1(6)
sendToObs1(7)
//Output
// Received: 1
// Received: 2
// Received: 3
// Received: 4
//Expected
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Received: 6
// Received: 7
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.js"></script>
我正在尝试连接两个 Obs1 和 Obs2。 Obs1 发出 5,6,7。 Obs2 发出 1,2,3,4。我正在连接 -> concat(Obs2, Obs1)
我的订阅者期望 1,2,3,4,5,6,7,但只得到 1,2,3,4。我做错了什么?
let Obs1 = new rxjs.Subject();
let Obs2 = new rxjs.Subject();
function sendToObs1(x){
Obs1.next(x)
}
async function sendToObs2(){
let trns = await getValues();
for(let i = 0; i < trns.length; i++){
Obs2.next(trns[i])
}
Obs2.complete()
}
function getValues(){
return new Promise((resolve, reject) => {
setTimeout(() => resolve([1,2,3,4]), 10)
})
};
rxjs.concat(Obs2, Obs1).subscribe({
next: x=> console.log("Received: " + x),
complete: () => console.log("Done")}
)
sendToObs2()
sendToObs1(5)
sendToObs1(6)
sendToObs1(7)
//Output
// Received: 1
// Received: 2
// Received: 3
// Received: 4
//Expected
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Received: 6
// Received: 7
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.js"></script>
Concat只在Obs2的onComplete之后订阅
let Obs1 = new rxjs.ReplaySubject();
let Obs2 = new rxjs.Subject();
function sendToObs1(x){
Obs1.next(x)
}
async function sendToObs2(){
let trns = await getValues();
for(let i = 0; i < trns.length; i++){
Obs2.next(trns[i])
}
Obs2.complete()
}
function getValues(){
return new Promise((resolve, reject) => {
setTimeout(() => resolve([1,2,3,4]), 10)
})
};
rxjs.concat(Obs2, Obs1).subscribe({
next: x=> console.log("Received: " + x),
complete: () => console.log("Done")}
)
sendToObs2()
sendToObs1(5)
sendToObs1(6)
sendToObs1(7)
//Output
// Received: 1
// Received: 2
// Received: 3
// Received: 4
//Expected
// Received: 1
// Received: 2
// Received: 3
// Received: 4
// Received: 5
// Received: 6
// Received: 7
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.1.0/rxjs.umd.js"></script>