在管道中两次使用 concatMap 的更好选择?
Better alternative to using concatMap twice within pipe?
我想在另一个承诺(承诺 1)解决后递归地执行一个预定义的承诺(承诺 2)以查看我与外围设备的连接何时 returns false,之后我可以做其他事情.我希望能够使用可观察对象来做到这一点。这是我到目前为止提出的代码,有 2 个问题:
1) 在 reprovision() 函数 ,
和
2) 是否有更好的方法来实现我的代码来完成我上面描述的事情?
这是使用可观察对象递归执行 promise 1(仅一次)和 promise 2 然后对结果执行某些操作的代码:
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => interval(1000)),
concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
}
这是 promise 2 (this.bleService.checkBleConnection1) 的代码,我希望递归执行该代码,直到外围设备与我的移动设备断开连接。这个承诺背后的想法是,只要我的移动设备仍然连接到外围设备,它就应该解决,一旦连接失败就会抛出错误。
checkBleConnection1(deviceId: string){
return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
.then(resp => {
if (!resp.some(elem => deviceId === elem.deviceId)){
throw new Error('Peripheral connection failed');
}
return true;
})
.catch(error => {
throw error;
});
}
根据你的post我的理解是
- 如果找到 device
,你想通过无限调用 this.bleService.checkBleConnection1(this.deviceId)
以 1 秒的间隔递归检查
- 如果未找到 设备,您不想检查更多
所以对于递归调用,Rxjs 中有一个运算符叫做 expand
link
为了实现 1 秒间隔添加 delay(1000)
返回递归调用
所以这是我使用 expand
代替 concatMap(x => interval(1000))
的版本
使用 expand
不需要使用 takeWhile
,所以我删除它
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage))
.pipe(
expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
)
.subscribe({
next: (resp) => console.log('still connected to peripheral'),
error: (err) =>
console.error('disconnected from peripheral, do something else'),
complete: () => console.log('Completed !'),
});
}
我相信这里的目标是在你完成第一个承诺后每 1 秒检查一次连接(旁注:这称为轮询,而不是递归,如果你调用 reprovision()
从内部 reprovision()
或类似的东西,你可以递归地轮询,但你不在这里,除非你必须,否则通常不想这样做。
你不能真正摆脱第二个concatMap
因为你必须根据间隔切换到它,你可以像这样分离和清理流:
const pollConnection$ = interval(1000).pipe(
concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => pollConnection$),
).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
但您需要小心使用像 concatMap
这样的运算符,因为它会产生对应用不利的背压。如果这些请求的响应时间可能超过 1 秒,那么您将构建一个请求队列,这会降低您的应用程序性能。更安全的替代方案是 switchMap
和 exhaustMap
,具体取决于所需的行为
我想在另一个承诺(承诺 1)解决后递归地执行一个预定义的承诺(承诺 2)以查看我与外围设备的连接何时 returns false,之后我可以做其他事情.我希望能够使用可观察对象来做到这一点。这是我到目前为止提出的代码,有 2 个问题:
1) 在 reprovision() 函数 ,
和
2) 是否有更好的方法来实现我的代码来完成我上面描述的事情?
这是使用可观察对象递归执行 promise 1(仅一次)和 promise 2 然后对结果执行某些操作的代码:
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => interval(1000)),
concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
}
这是 promise 2 (this.bleService.checkBleConnection1) 的代码,我希望递归执行该代码,直到外围设备与我的移动设备断开连接。这个承诺背后的想法是,只要我的移动设备仍然连接到外围设备,它就应该解决,一旦连接失败就会抛出错误。
checkBleConnection1(deviceId: string){
return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
.then(resp => {
if (!resp.some(elem => deviceId === elem.deviceId)){
throw new Error('Peripheral connection failed');
}
return true;
})
.catch(error => {
throw error;
});
}
根据你的post我的理解是
- 如果找到 device ,你想通过无限调用
- 如果未找到 设备,您不想检查更多
this.bleService.checkBleConnection1(this.deviceId)
以 1 秒的间隔递归检查
所以对于递归调用,Rxjs 中有一个运算符叫做 expand
link
为了实现 1 秒间隔添加 delay(1000)
返回递归调用
所以这是我使用 expand
代替 concatMap(x => interval(1000))
使用 expand
不需要使用 takeWhile
,所以我删除它
reprovision(){
this.bleMessage = this.prepareMessageService.prepareMessage(
MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
);
from(this.bleService.bleWrite('custom', this.bleMessage))
.pipe(
expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
)
.subscribe({
next: (resp) => console.log('still connected to peripheral'),
error: (err) =>
console.error('disconnected from peripheral, do something else'),
complete: () => console.log('Completed !'),
});
}
我相信这里的目标是在你完成第一个承诺后每 1 秒检查一次连接(旁注:这称为轮询,而不是递归,如果你调用 reprovision()
从内部 reprovision()
或类似的东西,你可以递归地轮询,但你不在这里,除非你必须,否则通常不想这样做。
你不能真正摆脱第二个concatMap
因为你必须根据间隔切换到它,你可以像这样分离和清理流:
const pollConnection$ = interval(1000).pipe(
concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
takeWhile(x => x, true)
);
from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
concatMap(x => pollConnection$),
).subscribe(
resp => console.log('still connected to peripheral'),
err => {
console.log('disconnected from peripheral, do something else');
}
);
但您需要小心使用像 concatMap
这样的运算符,因为它会产生对应用不利的背压。如果这些请求的响应时间可能超过 1 秒,那么您将构建一个请求队列,这会降低您的应用程序性能。更安全的替代方案是 switchMap
和 exhaustMap
,具体取决于所需的行为