在 rxJs 中与 flatMap 和 concatMap 作斗争
Struggling with flatMap vs concatMap in rxJs
我很难理解 rxJs 中 flatMap
和 concatMap
之间的区别。
我能理解的最清楚的答案是这里difference-between-concatmap-and-flatmap
所以我自己去尝试了。
import "./styles.css";
import { switchMap, flatMap, concatMap } from "rxjs/operators";
import { fromFetch } from "rxjs/fetch";
import { Observable } from "rxjs";
function createObs1() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(1);
subscriber.complete();
}, 900);
});
}
function createObs2() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(2);
//subscriber.next(22);
//subscriber.next(222);
subscriber.complete();
}, 800);
});
}
function createObs3() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(3);
//subscriber.next(33);
//subscriber.next(333);
subscriber.complete();
}, 700);
});
}
function createObs4() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 600);
});
}
function createObs5() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(5);
subscriber.complete();
}, 500);
});
}
createObs1()
.pipe(
flatMap((resp) => {
console.log(resp);
return createObs2();
}),
flatMap((resp) => {
console.log(resp);
return createObs3();
}),
flatMap((resp) => {
console.log(resp);
return createObs4();
}),
flatMap((resp) => {
console.log(resp);
return createObs5();
})
)
.subscribe((resp) => console.log(resp));
console.log("hellooo");
我用过这里的游乐场playground example
问题
1)
根据我的理解,flatMap 的使用应该混合输出,以便控制台日志类似于 (1,3,2,4,5)。我已经尝试了 30 多次并且总是出现在同一行 (1, 2, 3, 4, 5)
我做错了什么或没有理解错误?
2)
如果在 createObs2()
和 createObs3()
上删除注释并将代码包含在多个发出的事件中,那么事情就会变得一团糟。即使您更改为 concatMap,它也会把事情搞得一团糟,结果也好坏参半。我期望只有一次的多个数字会出现多次。结果可以是 (1, 2, 33, 3, 2, 22, 3, 33, 4, 5, 4, 3, 4, 5) 为什么会这样?
我如何在 playground 上测试示例。我只是从最后一个 console.log("hello") 中删除了 1 个字母。只有一个变化,例如 console.log("heloo") 然后被观察到,项目被再次编译并在控制台打印输出。
编辑:我使用 flatMap 和 concatMap 的原因是使用 http 库在 angular 中找到嵌套订阅的替代品。
createObs1().subscribe( (resp1) => {
console.log(resp1);
createObs2().subscribe( (resp2) => {
console.log(resp2);
createObs3().subscribe( (resp3) => {
console.log(resp3);
createObs4().subscribe( (resp4) => {
console.log(resp4);
createObs5().subscribe( (resp5) => {
console.log(resp5);
})
})
})
})
})
每次第一个 observable 发出时,都会在 flatMap
中创建第二个 observable 并开始发出。但是,第一个可观察值的值不会进一步传递。
每次第二个 Observable 发出时,下一个 flatMap
创建第三个 Observable,依此类推。同样,进入 flatMap
的原始值不再继续传递。
createObs1()
.pipe(
flatMap(() => createObs2()), // Merge this stream every time prev observable emits
flatMap(() => createObs3()), // Merge this stream every time prev observable emits
flatMap(() => createObs4()), // Merge this stream every time prev observable emits
flatMap(() => createObs5()), // Merge this stream every time prev observable emits
)
.subscribe((resp) => console.log(resp));
// OUTPUT:
// 5
因此,只有 createObs5()
发出的值才真正发送给观察者。从先前的可观察对象发出的值刚刚触发了新可观察对象的创建。
如果您使用 merge
,那么您会得到您可能一直期待的结果:
createObs1()
.pipe(
merge(createObs2()),
merge(createObs3()),
merge(createObs4()),
merge(createObs5()),
)
.subscribe((resp) => console.log(resp));
// OUTPUT:
// 5
// 4
// 3
// 2
// 1
您的测试场景不足以看出这两个运算符之间的差异。在您的测试用例中,每个可观察对象仅发出 1 次。如果一个 observable 只发出一个值,那么 concatMap
和 flatMap
(aka mergeMap)之间真的没有区别。只有在多次发射时才能看出差异。
所以,让我们使用不同的场景。让我们有一个 source$
observable,它只是每 1 秒发出一个递增的整数。然后,在我们的“Higher Order Mapping Operator”(concatMap
& mergeMap
)中,我们将 return 一个每 1 秒发出可变次数的可观察对象,然后完成。
// emit number every second
const source$ = interval(1000).pipe(map(n => n+1));
// helper to return observable that emits the provided number of times
function inner$(max: number, description: string): Observable<string> {
return interval(1000).pipe(
map(n => `[${description}: inner source ${max}] ${n+1}/${max}`),
take(max),
);
}
然后让我们根据 source$
和 inner$
定义两个独立的 observable;一种使用 concatMap
一种使用 flatMap
并观察输出。
const flatMap$ = source$.pipe(
flatMap(n => inner$(n, 'flatMap$'))
);
const concatMap$ = source$.pipe(
concatMap(n => inner$(n, 'concatMap$'))
);
在查看输出的差异之前,让我们先谈谈这些运算符的共同点。他们俩:
- 订阅由传入函数
编辑的可观察对象return
- 从这个“内部可观察到的”发出排放物
- 取消订阅内部 observable
不同之处在于他们创建和管理内部订阅的方式:
concatMap
- 一次只允许一个内部订阅。当它接收排放时,它一次只会订阅一个内部可观察对象。所以它最初会订阅“emission 1”创建的observable,只有在它完成后,它才会订阅“emission 2”创建的observable。这与 concat
静态方法的行为方式一致。
flatMap
(aka mergeMap
) - 允许许多内部订阅。因此,当收到新的发射时,它将订阅内部可观察量。这意味着发射不会有任何特定的顺序,因为它会在任何内部可观察量发射时发射。这与 merge
静态方法的行为方式一致( 这就是为什么我个人更喜欢名称“mergeMap”)的原因。
这是一个 StackBlitz,它显示了上述可观测值 concatMap$
和 mergeMap$
的输出:
希望以上的解释能帮助您解开疑惑!
#1 - “使用 flatMap 应该混合输出”
这没有像您预期的那样工作的原因是因为只有一个发射通过 flatMap
,这意味着您只有一个“内部可观察”发射值。如上例所示,一旦 flatMap 接收到多个发射,它就可以有多个独立发射的内部 observable。
#2 - “...并包含带有多个发出事件的代码然后事情变得混乱。”
“事情变得混乱”是由于有多个发出值的内部订阅。
对于您提到的关于使用 concatMap
并且仍然得到“混合”输出的部分,我不希望那样。当启用“自动保存”时,我在 StackBlitz 中看到了可观察到的排放的奇怪行为( 似乎有时它没有完全刷新并且旧订阅似乎在自动刷新后仍然存在,这给出了非常混乱的控制台输出).也许代码沙箱也有类似的问题。
#3 - “我使用 flatMap 和 concatMap 的原因是使用 http 库 在 angular 中找到嵌套订阅的替代品”
这是有道理的。你不想搞乱嵌套订阅,因为没有很好的方法来保证内部订阅将被清理。
在大多数使用 http 调用的情况下,我发现 switchMap
是理想的选择,因为它会减少您不再关心的内部可观察量的排放量。想象一下,您有一个从路由参数中读取 id
的组件。它使用此 id
进行 http 调用以获取数据。
itemId$ = this.activeRoute.params.pipe(
map(params => params['id']),
distinctUntilChanged()
);
item$ = this.itemId$.pipe(
switchMap(id => http.get(`${serverUrl}/items/${id}`)),
map(response => response.data)
);
我们希望item$
只发出“当前项目”(对应于url中的id)。假设我们的 UI 有一个按钮,用户可以通过 id
单击导航到下一个项目,并且您的应用发现自己有一个喜欢点击的用户不断点击该按钮,这改变了 url 参数甚至比 http 调用更快 return 数据。
如果我们选择 mergeMap
,我们最终会得到许多内部可观察对象,它们会发出所有这些 http 调用的结果。充其量,屏幕会随着所有这些不同的呼叫返回而闪烁。在最坏的情况下(如果调用无序返回),UI 将显示与 url 中的 ID 不同步的数据: -(
如果我们选择 concatMap
,用户将被迫等待所有 http 调用依次完成,即使我们只关心最近的那一个。
但是,对于 switchMap
,每当收到新的发射 (itemId
) 时,它将取消订阅先前的内部可观察对象并订阅新的。这意味着它永远不会发出不再相关的旧 http 调用的结果。 :-)
需要注意的一件事是,由于 http observables 仅发出一次,因此各种运算符(switchMap
、mergeMap
、concatMap
)之间的选择似乎没有什么不同,因为它们都为我们执行“内部可观察处理”。但是,如果您开始接收不止一次发射,最好让您的代码经得起未来考验,并选择真正为您提供所需行为的代码。
我很难理解 rxJs 中 flatMap
和 concatMap
之间的区别。
我能理解的最清楚的答案是这里difference-between-concatmap-and-flatmap
所以我自己去尝试了。
import "./styles.css";
import { switchMap, flatMap, concatMap } from "rxjs/operators";
import { fromFetch } from "rxjs/fetch";
import { Observable } from "rxjs";
function createObs1() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(1);
subscriber.complete();
}, 900);
});
}
function createObs2() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(2);
//subscriber.next(22);
//subscriber.next(222);
subscriber.complete();
}, 800);
});
}
function createObs3() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(3);
//subscriber.next(33);
//subscriber.next(333);
subscriber.complete();
}, 700);
});
}
function createObs4() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 600);
});
}
function createObs5() {
return new Observable<number>((subscriber) => {
setTimeout(() => {
subscriber.next(5);
subscriber.complete();
}, 500);
});
}
createObs1()
.pipe(
flatMap((resp) => {
console.log(resp);
return createObs2();
}),
flatMap((resp) => {
console.log(resp);
return createObs3();
}),
flatMap((resp) => {
console.log(resp);
return createObs4();
}),
flatMap((resp) => {
console.log(resp);
return createObs5();
})
)
.subscribe((resp) => console.log(resp));
console.log("hellooo");
我用过这里的游乐场playground example
问题
1) 根据我的理解,flatMap 的使用应该混合输出,以便控制台日志类似于 (1,3,2,4,5)。我已经尝试了 30 多次并且总是出现在同一行 (1, 2, 3, 4, 5)
我做错了什么或没有理解错误?
2)
如果在 createObs2()
和 createObs3()
上删除注释并将代码包含在多个发出的事件中,那么事情就会变得一团糟。即使您更改为 concatMap,它也会把事情搞得一团糟,结果也好坏参半。我期望只有一次的多个数字会出现多次。结果可以是 (1, 2, 33, 3, 2, 22, 3, 33, 4, 5, 4, 3, 4, 5) 为什么会这样?
我如何在 playground 上测试示例。我只是从最后一个 console.log("hello") 中删除了 1 个字母。只有一个变化,例如 console.log("heloo") 然后被观察到,项目被再次编译并在控制台打印输出。
编辑:我使用 flatMap 和 concatMap 的原因是使用 http 库在 angular 中找到嵌套订阅的替代品。
createObs1().subscribe( (resp1) => {
console.log(resp1);
createObs2().subscribe( (resp2) => {
console.log(resp2);
createObs3().subscribe( (resp3) => {
console.log(resp3);
createObs4().subscribe( (resp4) => {
console.log(resp4);
createObs5().subscribe( (resp5) => {
console.log(resp5);
})
})
})
})
})
每次第一个 observable 发出时,都会在 flatMap
中创建第二个 observable 并开始发出。但是,第一个可观察值的值不会进一步传递。
每次第二个 Observable 发出时,下一个 flatMap
创建第三个 Observable,依此类推。同样,进入 flatMap
的原始值不再继续传递。
createObs1()
.pipe(
flatMap(() => createObs2()), // Merge this stream every time prev observable emits
flatMap(() => createObs3()), // Merge this stream every time prev observable emits
flatMap(() => createObs4()), // Merge this stream every time prev observable emits
flatMap(() => createObs5()), // Merge this stream every time prev observable emits
)
.subscribe((resp) => console.log(resp));
// OUTPUT:
// 5
因此,只有 createObs5()
发出的值才真正发送给观察者。从先前的可观察对象发出的值刚刚触发了新可观察对象的创建。
如果您使用 merge
,那么您会得到您可能一直期待的结果:
createObs1()
.pipe(
merge(createObs2()),
merge(createObs3()),
merge(createObs4()),
merge(createObs5()),
)
.subscribe((resp) => console.log(resp));
// OUTPUT:
// 5
// 4
// 3
// 2
// 1
您的测试场景不足以看出这两个运算符之间的差异。在您的测试用例中,每个可观察对象仅发出 1 次。如果一个 observable 只发出一个值,那么 concatMap
和 flatMap
(aka mergeMap)之间真的没有区别。只有在多次发射时才能看出差异。
所以,让我们使用不同的场景。让我们有一个 source$
observable,它只是每 1 秒发出一个递增的整数。然后,在我们的“Higher Order Mapping Operator”(concatMap
& mergeMap
)中,我们将 return 一个每 1 秒发出可变次数的可观察对象,然后完成。
// emit number every second
const source$ = interval(1000).pipe(map(n => n+1));
// helper to return observable that emits the provided number of times
function inner$(max: number, description: string): Observable<string> {
return interval(1000).pipe(
map(n => `[${description}: inner source ${max}] ${n+1}/${max}`),
take(max),
);
}
然后让我们根据 source$
和 inner$
定义两个独立的 observable;一种使用 concatMap
一种使用 flatMap
并观察输出。
const flatMap$ = source$.pipe(
flatMap(n => inner$(n, 'flatMap$'))
);
const concatMap$ = source$.pipe(
concatMap(n => inner$(n, 'concatMap$'))
);
在查看输出的差异之前,让我们先谈谈这些运算符的共同点。他们俩:
- 订阅由传入函数 编辑的可观察对象return
- 从这个“内部可观察到的”发出排放物
- 取消订阅内部 observable
不同之处在于他们创建和管理内部订阅的方式:
concatMap
- 一次只允许一个内部订阅。当它接收排放时,它一次只会订阅一个内部可观察对象。所以它最初会订阅“emission 1”创建的observable,只有在它完成后,它才会订阅“emission 2”创建的observable。这与 concat
静态方法的行为方式一致。
flatMap
(aka mergeMap
) - 允许许多内部订阅。因此,当收到新的发射时,它将订阅内部可观察量。这意味着发射不会有任何特定的顺序,因为它会在任何内部可观察量发射时发射。这与 merge
静态方法的行为方式一致( 这就是为什么我个人更喜欢名称“mergeMap”)的原因。
这是一个 StackBlitz,它显示了上述可观测值 concatMap$
和 mergeMap$
的输出:
希望以上的解释能帮助您解开疑惑!
#1 - “使用 flatMap 应该混合输出”
这没有像您预期的那样工作的原因是因为只有一个发射通过 flatMap
,这意味着您只有一个“内部可观察”发射值。如上例所示,一旦 flatMap 接收到多个发射,它就可以有多个独立发射的内部 observable。
#2 - “...并包含带有多个发出事件的代码然后事情变得混乱。”
“事情变得混乱”是由于有多个发出值的内部订阅。
对于您提到的关于使用 concatMap
并且仍然得到“混合”输出的部分,我不希望那样。当启用“自动保存”时,我在 StackBlitz 中看到了可观察到的排放的奇怪行为( 似乎有时它没有完全刷新并且旧订阅似乎在自动刷新后仍然存在,这给出了非常混乱的控制台输出).也许代码沙箱也有类似的问题。
#3 - “我使用 flatMap 和 concatMap 的原因是使用 http 库 在 angular 中找到嵌套订阅的替代品”
这是有道理的。你不想搞乱嵌套订阅,因为没有很好的方法来保证内部订阅将被清理。
在大多数使用 http 调用的情况下,我发现 switchMap
是理想的选择,因为它会减少您不再关心的内部可观察量的排放量。想象一下,您有一个从路由参数中读取 id
的组件。它使用此 id
进行 http 调用以获取数据。
itemId$ = this.activeRoute.params.pipe(
map(params => params['id']),
distinctUntilChanged()
);
item$ = this.itemId$.pipe(
switchMap(id => http.get(`${serverUrl}/items/${id}`)),
map(response => response.data)
);
我们希望item$
只发出“当前项目”(对应于url中的id)。假设我们的 UI 有一个按钮,用户可以通过 id
单击导航到下一个项目,并且您的应用发现自己有一个喜欢点击的用户不断点击该按钮,这改变了 url 参数甚至比 http 调用更快 return 数据。
如果我们选择 mergeMap
,我们最终会得到许多内部可观察对象,它们会发出所有这些 http 调用的结果。充其量,屏幕会随着所有这些不同的呼叫返回而闪烁。在最坏的情况下(如果调用无序返回),UI 将显示与 url 中的 ID 不同步的数据: -(
如果我们选择 concatMap
,用户将被迫等待所有 http 调用依次完成,即使我们只关心最近的那一个。
但是,对于 switchMap
,每当收到新的发射 (itemId
) 时,它将取消订阅先前的内部可观察对象并订阅新的。这意味着它永远不会发出不再相关的旧 http 调用的结果。 :-)
需要注意的一件事是,由于 http observables 仅发出一次,因此各种运算符(switchMap
、mergeMap
、concatMap
)之间的选择似乎没有什么不同,因为它们都为我们执行“内部可观察处理”。但是,如果您开始接收不止一次发射,最好让您的代码经得起未来考验,并选择真正为您提供所需行为的代码。