RxJS 在通过 UDP 发送下一个命令之前等待响应
RxJS waiting for response before sending next command over UDP
我目前正在进行一个项目,我将 UDP 命令发送到 Tello 无人机。
问题是它使用 UDP,当我在前一个命令尚未完成之前发送命令太快时,第二个 command/action 不会发生。我在这个项目中使用 RxJS,我想创建一种机制来等待无人机的响应(“ok”或“error”)。
我的想法是有 2 个不同的 observables。 1 个 observable 是来自无人机响应的输入流和一个我用作 commandQueue
的 observable 的 observable。这个 commandQueue
上面有简单的可观察对象,其中包含我要发送的 1 个命令。当我收到来自其他可观察对象的“确定”消息时,我只想发送下一个命令。当我得到“确定”时,我将完成可观察到的简单命令,它会自动接收 commandQueue
上的下一个值,即下一个命令。
我的代码仅在我发送一组命令时有效,但我想多次调用该函数,所以一个一个地发送它们。
以下代码是有问题的函数,testsubject 是一个可观察对象,用于向无人机发送下一个命令。
async send_command_with_return(msg) {
let parentobject = this;
let zeroTime = timestamp();
const now = () => numeral((timestamp() - zeroTime) / 10e3).format("0.0000");
const asyncTask = data =>
new Observable(obs => {
console.log(`${now()}: starting async task ${data}`);
parentobject.Client.pipe(take(1)).subscribe(
dataa => {
console.log("loool")
obs.next(data);
this.testSubject.next(data);
console.log(`${now()}: end of async task ${data}`);
obs.complete();
},
err => console.error("Observer got an error: " + err),
() => console.log("observer asynctask finished with " + data + "\n")
);
});
let p = this.commandQueue.pipe(concatMap(asyncTask)).toPromise(P); //commandQueue is a subject in the constructor
console.log("start filling queue with " + msg);
zeroTime = timestamp();
this.commandQueue.next(msg);
//["streamon", "streamoff", "height?", "temp?"].forEach(a => this.commandQueue.next(a));
await p;
// this.testSubject.next(msg);
}
streamon() {
this.send_command_with_return("streamon");
}
streamoff() {
this.send_command_with_return("streamoff");
}
get_speed() {
this.send_command_with_return("speed?");
}
get_battery() {
this.send_command_with_return("battery?");
}
}
let tello = new Tello();
tello.init();
tello.streamon();
tello.streamoff();
您可以一次发送一个命令,方法是使用一个简单的主题来推送命令,并通过 concatMap
一次执行一个命令。
与其尝试将所有逻辑放在一个函数中,不如制作一个简单的 class 可能更容易,也许将其称为 TelloService
或其他名称:
class TelloService {
private commandQueue$ = new Subject<Command>();
constructor(private telloClient: FakeTelloClient) {
this.commandQueue$
.pipe(
concatMap(command => this.telloClient.sendCommand(command))
)
.subscribe()
}
sendCommand(command: Command) {
this.commandQueue$.next(command);
}
}
实例化服务时,它会订阅 commandQueue$
并且对于收到的每个命令,它将“完成”进行异步调用的工作。 concatMap
用于一次处理一个命令。
消费者只需调用 service.sendCommand()
即可向队列提交命令。注意命令一次提交一个,没有必要提交一组命令。
这是一个 working StackBlitz 示例。
要解决您在继续之前等待直到收到 ok
或 error
响应的情况,您可以使用 takeWhile()
,这意味着它不会完成可观察到的条件满足了。
要引入最长等待时间,您可以使用 takeUntil()
with timer()
在计时器发出时结束流:
this.commandQueue$
.pipe(
concatMap(command => this.telloClient.sendCommand(command).pipe(
takeWhile(status => !['ok', 'error'].includes(status), true),
takeUntil(timer(3000))
))
)
.subscribe()
这是更新后的 StackBlitz。
我目前正在进行一个项目,我将 UDP 命令发送到 Tello 无人机。 问题是它使用 UDP,当我在前一个命令尚未完成之前发送命令太快时,第二个 command/action 不会发生。我在这个项目中使用 RxJS,我想创建一种机制来等待无人机的响应(“ok”或“error”)。
我的想法是有 2 个不同的 observables。 1 个 observable 是来自无人机响应的输入流和一个我用作 commandQueue
的 observable 的 observable。这个 commandQueue
上面有简单的可观察对象,其中包含我要发送的 1 个命令。当我收到来自其他可观察对象的“确定”消息时,我只想发送下一个命令。当我得到“确定”时,我将完成可观察到的简单命令,它会自动接收 commandQueue
上的下一个值,即下一个命令。
我的代码仅在我发送一组命令时有效,但我想多次调用该函数,所以一个一个地发送它们。
以下代码是有问题的函数,testsubject 是一个可观察对象,用于向无人机发送下一个命令。
async send_command_with_return(msg) {
let parentobject = this;
let zeroTime = timestamp();
const now = () => numeral((timestamp() - zeroTime) / 10e3).format("0.0000");
const asyncTask = data =>
new Observable(obs => {
console.log(`${now()}: starting async task ${data}`);
parentobject.Client.pipe(take(1)).subscribe(
dataa => {
console.log("loool")
obs.next(data);
this.testSubject.next(data);
console.log(`${now()}: end of async task ${data}`);
obs.complete();
},
err => console.error("Observer got an error: " + err),
() => console.log("observer asynctask finished with " + data + "\n")
);
});
let p = this.commandQueue.pipe(concatMap(asyncTask)).toPromise(P); //commandQueue is a subject in the constructor
console.log("start filling queue with " + msg);
zeroTime = timestamp();
this.commandQueue.next(msg);
//["streamon", "streamoff", "height?", "temp?"].forEach(a => this.commandQueue.next(a));
await p;
// this.testSubject.next(msg);
}
streamon() {
this.send_command_with_return("streamon");
}
streamoff() {
this.send_command_with_return("streamoff");
}
get_speed() {
this.send_command_with_return("speed?");
}
get_battery() {
this.send_command_with_return("battery?");
}
}
let tello = new Tello();
tello.init();
tello.streamon();
tello.streamoff();
您可以一次发送一个命令,方法是使用一个简单的主题来推送命令,并通过 concatMap
一次执行一个命令。
与其尝试将所有逻辑放在一个函数中,不如制作一个简单的 class 可能更容易,也许将其称为 TelloService
或其他名称:
class TelloService {
private commandQueue$ = new Subject<Command>();
constructor(private telloClient: FakeTelloClient) {
this.commandQueue$
.pipe(
concatMap(command => this.telloClient.sendCommand(command))
)
.subscribe()
}
sendCommand(command: Command) {
this.commandQueue$.next(command);
}
}
实例化服务时,它会订阅 commandQueue$
并且对于收到的每个命令,它将“完成”进行异步调用的工作。 concatMap
用于一次处理一个命令。
消费者只需调用 service.sendCommand()
即可向队列提交命令。注意命令一次提交一个,没有必要提交一组命令。
这是一个 working StackBlitz 示例。
要解决您在继续之前等待直到收到 ok
或 error
响应的情况,您可以使用 takeWhile()
,这意味着它不会完成可观察到的条件满足了。
要引入最长等待时间,您可以使用 takeUntil()
with timer()
在计时器发出时结束流:
this.commandQueue$
.pipe(
concatMap(command => this.telloClient.sendCommand(command).pipe(
takeWhile(status => !['ok', 'error'].includes(status), true),
takeUntil(timer(3000))
))
)
.subscribe()
这是更新后的 StackBlitz。