在 Node.JS 和 TypeScript 中实现 UDP Command-Acknowledge 通信协议
Implementing UDP Command-Acknowledge communication protocol in Node.JS and TypeScript
我正在尝试通过 Node.JS 和 TypeScript 中的 UDP 数据报实现自定义通信协议。在这个协议中,我有一些命令必须以特定顺序发送到微控制器,每个命令都必须等待微控制器对前一个命令的确认,然后才能发送。然而,考虑到 Node.JS' dgram
模块的异步和 "socket-centered" 理念,我很难找到实现它的正确方法。
截至目前,我创建了 abstract class ProtocolCommand
,以及各种具体的子项(StartFirmwareUpgradeCommand
、WriteCommand
、EndFirmwareUpgradeCommand
)。所有 classes 都被 Protocol
class 消耗,它应该协调所有要执行的命令。我在下面附上摘要和一个示例 classes。此外,命令的数量是可变的(更具体地说,在 StartFirmwareUpgrade 之后,我有可变数量的 Write 命令,我在其中将固件字节发送到微控制器)。
协议命令:
import q = require('q');
export abstract class ProtocolCommand {
protected socket:dgram.Socket;
protected ip:string;
protected port:number;
protected deferred;
constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
this.socket = socket;
this.ip = ip;
this.port = port;
this.deferred = deferred;
}
protected callback(data, sender) {
this.socket.removeListener('message', this.callback);
this.deferred.resolve(data);
}
abstract executeCommand():void;
}
启动固件升级命令:
import dgram = require('dgram');
import {ProtocolCommand} from "./ProtocolCommand";
import CRC = require('./CRC');
import q = require('q');
export class StartFirmwareUpgradeCommand extends ProtocolCommand {
private header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00];
private data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36];
constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
super(socket, ip, port, deferred);
}
executeCommand() {
let commandBytes = this.header.concat(this.data);
let crcBytes = CRC.CRC16(commandBytes);
commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
this.socket.on('message', (data, sender) => {
this.callback(data, sender);
});
this.socket.send(new Buffer(commandBytes), 0, commandBytes.length, this.port, this.ip);
return this.deferred.promise;
}
}
协议:
import dgram = require('dgram');
import {StartFirmwareUpgradeCommand} from "./StartFirmwareUpgradeCommand";
import {EndFirmwareUpgradeCommand} from "./EndFirmwareUpgradeCommand";
import {DiscoveryCommand} from "./DiscoveryCommand";
import q = require('q');
export class Protocol {
private socket;
private ip:string;
private port:number;
constructor(ip:string, port:number) {
this.ip = ip;
this.port = port;
this.socket = dgram.createSocket('udp4');
this.socket.bind();
}
upgradeFirmware(data:Uint8Array) {
let globalDeferred = q.defer();
//FIXME UGLY AS HELL!
new StartFirmwareUpgradeCommand(this.socket, this.ip, this.port, globalDeferred).executeCommand()
.then((data) => {
})
.then((data) => {
});
//TODO send n*write firmware command, wait for every ack
for (let i = 0; i < data.length / 128; i++) {
}
//new EndFirmwareUpgradeCommand(this.socket, this.ip, this.port).executeCommand();
//TODO send end firmware command, wait for ack
}
}
如您所见,我目前正在使用 q
来使用 promises 并尽量避免回调,但我真的很难找到一种合适的方法来对所有内容进行编码。任何帮助将不胜感激。
在这里使用 q
不会很好地为您服务,这就是为什么:
q
旨在帮助您的图书馆用户以异步方式使用您的图书馆。它是否更多地是作为一种 "nice" 方式让您的用户知道某事何时完成。然后做他们想做的其他事情(在他们的代码中)。
但是在构建协议时,您不想在任何地方都执行 .then,尤其是当您不知道接下来会发生什么时。
用什么?
我真的推荐 rxjs
在这里,它使用可观察对象并且实现每个命令会更加直观。从所有命令中创建队列并以(异步)循环方式 运行 对所有命令创建队列也将非常容易。
你可以看几分钟here。 (不用担心那里所有 Angular 的事情)
希望对您有所帮助。
根据@gilamran 的推荐,这里是一个RxJS 实现。它只等到收到响应才发送下一个请求。它不处理错误要求您刷新队列的情况。
import dgram = require('dgram');
import Rx = require('rx'); // or use rx.lite if you need something smaller.
let commandQueue = Rx.Subject();
let socket = dgram.createSocket('udp4');
let ip = '127.0.0.1';
let port = '10000';
// let `req` be an object with { ip, port, header, data }.
// sendCommand :: Request -> Observable of Responses
function sendCommand (req) {
// return this Observable so we can use Rx.Observable.concat later to
// block while waiting for a response.
return Rx.Observable.create(obs => {
let commandBytes = req.header.concat(req.data);
const crcBytes = CRC.CRC16(commandBytes);
commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
this.socket.on('message', (data, sender) => {
// pass this information on for further processing?
obs.onNext({
data,
sender,
});
obs.onCompleted(); // close this observable so `.concat` switches to next request.
});
socket.send(new Buffer(commandBytes), 0, commandBytes.length, req.port, req.ip)
});
}
// ok. let's setup the downstream side of our queue.
// 1. take a request, send a packet, return an observable of one response.
// 2. wait for the observable of one response to complete.
let responses = commandQueue
.concatMap(sendCommand); // takes command requests and turns them into data/sender responses.
// we must subscribe to pull values through our observable chain.
responses.subscribe();
let cmdStartFirmwareUpgrade = (ip, port) => {
return {
ip,
port,
header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00],
data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36],
};
};
let cmdDiscovery = (ip, port) => { /* ... */ };
let cmdEndFirmwareUpgrade = (ip, port) => { /* ... */ };
// now let's put some commands in the queue.
commandQueue.onNext(cmdStartFirmwareUpgrade(ip, port));
commandQueue.onNext(cmdDiscovery(ip, port));
commandQueue.onNext(cmdEndFirmwareUpgrade(ip, port));
这个例子当然可以是 DRY'er,但我更喜欢使用 Ramda curried 函数而不是 类,所以我忽略了这一点。
您可以在 https://runkit.com/boxofrox/rxjs-queue 找到此设计的顺序性质的演示。
我正在尝试通过 Node.JS 和 TypeScript 中的 UDP 数据报实现自定义通信协议。在这个协议中,我有一些命令必须以特定顺序发送到微控制器,每个命令都必须等待微控制器对前一个命令的确认,然后才能发送。然而,考虑到 Node.JS' dgram
模块的异步和 "socket-centered" 理念,我很难找到实现它的正确方法。
截至目前,我创建了 abstract class ProtocolCommand
,以及各种具体的子项(StartFirmwareUpgradeCommand
、WriteCommand
、EndFirmwareUpgradeCommand
)。所有 classes 都被 Protocol
class 消耗,它应该协调所有要执行的命令。我在下面附上摘要和一个示例 classes。此外,命令的数量是可变的(更具体地说,在 StartFirmwareUpgrade 之后,我有可变数量的 Write 命令,我在其中将固件字节发送到微控制器)。
协议命令:
import q = require('q');
export abstract class ProtocolCommand {
protected socket:dgram.Socket;
protected ip:string;
protected port:number;
protected deferred;
constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
this.socket = socket;
this.ip = ip;
this.port = port;
this.deferred = deferred;
}
protected callback(data, sender) {
this.socket.removeListener('message', this.callback);
this.deferred.resolve(data);
}
abstract executeCommand():void;
}
启动固件升级命令:
import dgram = require('dgram');
import {ProtocolCommand} from "./ProtocolCommand";
import CRC = require('./CRC');
import q = require('q');
export class StartFirmwareUpgradeCommand extends ProtocolCommand {
private header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00];
private data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36];
constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
super(socket, ip, port, deferred);
}
executeCommand() {
let commandBytes = this.header.concat(this.data);
let crcBytes = CRC.CRC16(commandBytes);
commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
this.socket.on('message', (data, sender) => {
this.callback(data, sender);
});
this.socket.send(new Buffer(commandBytes), 0, commandBytes.length, this.port, this.ip);
return this.deferred.promise;
}
}
协议:
import dgram = require('dgram');
import {StartFirmwareUpgradeCommand} from "./StartFirmwareUpgradeCommand";
import {EndFirmwareUpgradeCommand} from "./EndFirmwareUpgradeCommand";
import {DiscoveryCommand} from "./DiscoveryCommand";
import q = require('q');
export class Protocol {
private socket;
private ip:string;
private port:number;
constructor(ip:string, port:number) {
this.ip = ip;
this.port = port;
this.socket = dgram.createSocket('udp4');
this.socket.bind();
}
upgradeFirmware(data:Uint8Array) {
let globalDeferred = q.defer();
//FIXME UGLY AS HELL!
new StartFirmwareUpgradeCommand(this.socket, this.ip, this.port, globalDeferred).executeCommand()
.then((data) => {
})
.then((data) => {
});
//TODO send n*write firmware command, wait for every ack
for (let i = 0; i < data.length / 128; i++) {
}
//new EndFirmwareUpgradeCommand(this.socket, this.ip, this.port).executeCommand();
//TODO send end firmware command, wait for ack
}
}
如您所见,我目前正在使用 q
来使用 promises 并尽量避免回调,但我真的很难找到一种合适的方法来对所有内容进行编码。任何帮助将不胜感激。
在这里使用 q
不会很好地为您服务,这就是为什么:
q
旨在帮助您的图书馆用户以异步方式使用您的图书馆。它是否更多地是作为一种 "nice" 方式让您的用户知道某事何时完成。然后做他们想做的其他事情(在他们的代码中)。
但是在构建协议时,您不想在任何地方都执行 .then,尤其是当您不知道接下来会发生什么时。
用什么?
我真的推荐 rxjs
在这里,它使用可观察对象并且实现每个命令会更加直观。从所有命令中创建队列并以(异步)循环方式 运行 对所有命令创建队列也将非常容易。
你可以看几分钟here。 (不用担心那里所有 Angular 的事情)
希望对您有所帮助。
根据@gilamran 的推荐,这里是一个RxJS 实现。它只等到收到响应才发送下一个请求。它不处理错误要求您刷新队列的情况。
import dgram = require('dgram');
import Rx = require('rx'); // or use rx.lite if you need something smaller.
let commandQueue = Rx.Subject();
let socket = dgram.createSocket('udp4');
let ip = '127.0.0.1';
let port = '10000';
// let `req` be an object with { ip, port, header, data }.
// sendCommand :: Request -> Observable of Responses
function sendCommand (req) {
// return this Observable so we can use Rx.Observable.concat later to
// block while waiting for a response.
return Rx.Observable.create(obs => {
let commandBytes = req.header.concat(req.data);
const crcBytes = CRC.CRC16(commandBytes);
commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
this.socket.on('message', (data, sender) => {
// pass this information on for further processing?
obs.onNext({
data,
sender,
});
obs.onCompleted(); // close this observable so `.concat` switches to next request.
});
socket.send(new Buffer(commandBytes), 0, commandBytes.length, req.port, req.ip)
});
}
// ok. let's setup the downstream side of our queue.
// 1. take a request, send a packet, return an observable of one response.
// 2. wait for the observable of one response to complete.
let responses = commandQueue
.concatMap(sendCommand); // takes command requests and turns them into data/sender responses.
// we must subscribe to pull values through our observable chain.
responses.subscribe();
let cmdStartFirmwareUpgrade = (ip, port) => {
return {
ip,
port,
header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00],
data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36],
};
};
let cmdDiscovery = (ip, port) => { /* ... */ };
let cmdEndFirmwareUpgrade = (ip, port) => { /* ... */ };
// now let's put some commands in the queue.
commandQueue.onNext(cmdStartFirmwareUpgrade(ip, port));
commandQueue.onNext(cmdDiscovery(ip, port));
commandQueue.onNext(cmdEndFirmwareUpgrade(ip, port));
这个例子当然可以是 DRY'er,但我更喜欢使用 Ramda curried 函数而不是 类,所以我忽略了这一点。
您可以在 https://runkit.com/boxofrox/rxjs-queue 找到此设计的顺序性质的演示。