RxJS 在通过 UDP 发送下一个命令之前等待响应

RxJS waiting for response before sending next command over UDP

我目前正在进行一个项目,我将 UDP 命令发送到 Tello 无人机。 问题是它使用 UDP,当我在前一个命令尚未完成之前发送命令太快时,第二个 command/action 不会发生。我在这个项目中使用 RxJS,我想创建一种机制来等待无人机的响应(“ok”或“error”)。

我的想法是有 2 个不同的 observables。 1 个 observable 是来自无人机响应的输入流和一个我用作 commandQueue 的 observable 的 observable。这个 commandQueue 上面有简单的可观察对象,其中包含我要发送的 1 个命令。当我收到来自其他可观察对象的“确定”消息时,我只想发送下一个命令。当我得到“确定”时,我将完成可观察到的简单命令,它会自动接收 commandQueue 上的下一个值,即下一个命令。

我的代码仅在我发送一组命令时有效,但我想多次调用该函数,所以一个一个地发送它们。

以下代码是有问题的函数,testsubject 是一个可观察对象,用于向无人机发送下一个命令。

async send_command_with_return(msg) {
    let parentobject = this;

    let zeroTime = timestamp();
    const now = () => numeral((timestamp() - zeroTime) / 10e3).format("0.0000");

    const asyncTask = data =>
      new Observable(obs => {
        console.log(`${now()}: starting async task ${data}`);

        
        parentobject.Client.pipe(take(1)).subscribe(
          dataa => {
            console.log("loool")
            obs.next(data);
            this.testSubject.next(data);
            console.log(`${now()}: end of async task ${data}`);
            obs.complete();
          },
          err => console.error("Observer got an error: " + err),
          () => console.log("observer asynctask finished with " + data + "\n")
        );
      });

    let p = this.commandQueue.pipe(concatMap(asyncTask)).toPromise(P); //commandQueue is a subject in the constructor

    console.log("start filling queue with " + msg);
    zeroTime = timestamp();
    this.commandQueue.next(msg);
    //["streamon", "streamoff", "height?", "temp?"].forEach(a => this.commandQueue.next(a));

    await p;

    // this.testSubject.next(msg);
  }

  streamon() {
    this.send_command_with_return("streamon");
  }

  streamoff() {
    this.send_command_with_return("streamoff");
  }

  get_speed() {
    this.send_command_with_return("speed?");
  }

  get_battery() {
    this.send_command_with_return("battery?");
  }
}

let tello = new Tello();
tello.init();

 tello.streamon();
 tello.streamoff();

您可以一次发送一个命令,方法是使用一个简单的主题来推送命令,并通过 concatMap 一次执行一个命令。

与其尝试将所有逻辑放在一个函数中,不如制作一个简单的 class 可能更容易,也许将其称为 TelloService 或其他名称:

class TelloService {
  private commandQueue$ = new Subject<Command>();
  
  constructor(private telloClient: FakeTelloClient) {
    this.commandQueue$
      .pipe(
        concatMap(command => this.telloClient.sendCommand(command))
      )
      .subscribe()
  }

  sendCommand(command: Command) {
    this.commandQueue$.next(command);
  }
}

实例化服务时,它会订阅 commandQueue$ 并且对于收到的每个命令,它将“完成”进行异步调用的工作。 concatMap 用于一次处理一个命令。

消费者只需调用 service.sendCommand() 即可向队列提交命令。注意命令一次提交一个,没有必要提交一组命令。

这是一个 working StackBlitz 示例。


要解决您在继续之前等待直到收到 okerror 响应的情况,您可以使用 takeWhile(),这意味着它不会完成可观察到的条件满足了。

要引入最长等待时间,您可以使用 takeUntil() with timer() 在计时器发出时结束流:

    this.commandQueue$
      .pipe(
        concatMap(command => this.telloClient.sendCommand(command).pipe(
          takeWhile(status => !['ok', 'error'].includes(status), true),
          takeUntil(timer(3000))
        ))
      )
      .subscribe()

这是更新后的 StackBlitz