在管道中两次使用 concatMap 的更好选择?

Better alternative to using concatMap twice within pipe?

我想在另一个承诺(承诺 1)解决后递归地执行一个预定义的承诺(承诺 2)以查看我与外围设备的连接何时 returns false,之后我可以做其他事情.我希望能够使用可观察对象来做到这一点。这是我到目前为止提出的代码,有 2 个问题:

1) 在 reprovision() 函数 ,

2) 是否有更好的方法来实现我的代码来完成我上面描述的事情

这是使用可观察对象递归执行 promise 1(仅一次)和 promise 2 然后对结果执行某些操作的代码:

reprovision(){
    this.bleMessage = this.prepareMessageService.prepareMessage(
      MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
    );

    from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
      concatMap(x => interval(1000)),
      concatMap(x => this.bleService.checkBleConnection1(this.deviceId)),
      takeWhile(x => x, true)).subscribe(
        resp => console.log('still connected to peripheral'),
        err =>  {
          console.log('disconnected from peripheral, do something else');
        }
      );
}

这是 promise 2 (this.bleService.checkBleConnection1) 的代码,我希望递归执行该代码,直到外围设备与我的移动设备断开连接。这个承诺背后的想法是,只要我的移动设备仍然连接到外围设备,它就应该解决,一旦连接失败就会抛出错误。

checkBleConnection1(deviceId: string){
  return BleClient.getConnectedDevices([]) //BleClient.getConnectedDevices([]) returns a promise
  .then(resp => {
    if (!resp.some(elem => deviceId === elem.deviceId)){
      throw new Error('Peripheral connection failed');
    }
    return true;
  })
  .catch(error => {
    throw error;
  });
}

根据你的post我的理解是

  • 如果找到 device
  • ,你想通过无限调用 this.bleService.checkBleConnection1(this.deviceId) 以 1 秒的间隔递归检查
  • 如果未找到 设备,您不想检查更多

所以对于递归调用,Rxjs 中有一个运算符叫做 expand link

为了实现 1 秒间隔添加 delay(1000) 返回递归调用

所以这是我使用 expand 代替 concatMap(x => interval(1000))

的版本

使用 expand 不需要使用 takeWhile,所以我删除它

reprovision(){
  this.bleMessage = this.prepareMessageService.prepareMessage(
    MessageCategory.customAction, CustomActionConstants.reboot, this.customPayload
  );

  from(this.bleService.bleWrite('custom', this.bleMessage))
    .pipe(
      expand((x) => from(this.bleService.checkBleConnection1(this.deviceId)).pipe(delay(1000)))
    )
    .subscribe({
      next: (resp) => console.log('still connected to peripheral'),
      error: (err) =>
        console.error('disconnected from peripheral, do something else'),
      complete: () => console.log('Completed !'),
    });
}

我相信这里的目标是在你完成第一个承诺后每 1 秒检查一次连接(旁注:这称为轮询,而不是递归,如果你调用 reprovision() 从内部 reprovision() 或类似的东西,你可以递归地轮询,但你不在这里,除非你必须,否则通常不想这样做。

你不能真正摆脱第二个concatMap因为你必须根据间隔切换到它,你可以像这样分离和清理流:

const pollConnection$ = interval(1000).pipe(
  concatMap(i => this.bleService.checkBleConnection1(this.deviceId)),
  takeWhile(x => x, true)
);

from(this.bleService.bleWrite('custom', this.bleMessage)).pipe(
  concatMap(x => pollConnection$),
).subscribe(
    resp => console.log('still connected to peripheral'),
    err =>  {
      console.log('disconnected from peripheral, do something else');
    }
  );

但您需要小心使用像 concatMap 这样的运算符,因为它会产生对应用不利的背压。如果这些请求的响应时间可能超过 1 秒,那么您将构建一个请求队列,这会降低您的应用程序性能。更安全的替代方案是 switchMapexhaustMap,具体取决于所需的行为