在 Node.JS 和 TypeScript 中实现 UDP Command-Acknowledge 通信协议

Implementing UDP Command-Acknowledge communication protocol in Node.JS and TypeScript

我正在尝试通过 Node.JS 和 TypeScript 中的 UDP 数据报实现自定义通信协议。在这个协议中,我有一些命令必须以特定顺序发送到微控制器,每个命令都必须等待微控制器对前一个命令的确认,然后才能发送。然而,考虑到 Node.JS' dgram 模块的异步和 "socket-centered" 理念,我很难找到实现它的正确方法。

截至目前,我创建了 abstract class ProtocolCommand,以及各种具体的子项(StartFirmwareUpgradeCommandWriteCommandEndFirmwareUpgradeCommand)。所有 classes 都被 Protocol class 消耗,它应该协调所有要执行的命令。我在下面附上摘要和一个示例 classes。此外,命令的数量是可变的(更具体地说,在 StartFirmwareUpgrade 之后,我有可变数量的 Write 命令,我在其中将固件字节发送到微控制器)。

协议命令:

import q = require('q');

export abstract class ProtocolCommand {
    protected socket:dgram.Socket;
    protected ip:string;
    protected port:number;
    protected deferred;

    constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
        this.socket = socket;
        this.ip = ip;
        this.port = port;
        this.deferred = deferred;
    }

    protected callback(data, sender) {
        this.socket.removeListener('message', this.callback);
        this.deferred.resolve(data);
    }

    abstract executeCommand():void;
}

启动固件升级命令:

import dgram = require('dgram');
import {ProtocolCommand} from "./ProtocolCommand";
import CRC = require('./CRC');
import q = require('q');

export class StartFirmwareUpgradeCommand extends ProtocolCommand {
    private header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00];
    private data = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36];

    constructor(socket:dgram.Socket, ip:string, port:number, deferred) {
        super(socket, ip, port, deferred);
    }

    executeCommand() {
        let commandBytes = this.header.concat(this.data);
        let crcBytes = CRC.CRC16(commandBytes);
        commandBytes = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
        this.socket.on('message', (data, sender) => {
            this.callback(data, sender);
        });
        this.socket.send(new Buffer(commandBytes), 0, commandBytes.length, this.port, this.ip);
        return this.deferred.promise;
    }
}

协议:

import dgram = require('dgram');
import {StartFirmwareUpgradeCommand} from "./StartFirmwareUpgradeCommand";
import {EndFirmwareUpgradeCommand} from "./EndFirmwareUpgradeCommand";
import {DiscoveryCommand} from "./DiscoveryCommand";
import q = require('q');

export class Protocol {
    private socket;
    private ip:string;
    private port:number;

    constructor(ip:string, port:number) {
        this.ip = ip;
        this.port = port;
        this.socket = dgram.createSocket('udp4');
        this.socket.bind();
    }

    upgradeFirmware(data:Uint8Array) {
        let globalDeferred = q.defer();

        //FIXME UGLY AS HELL!
        new StartFirmwareUpgradeCommand(this.socket, this.ip, this.port, globalDeferred).executeCommand()
            .then((data) => {

            })
            .then((data) => {

            });
        //TODO send n*write firmware command, wait for every ack
        for (let i = 0; i < data.length / 128; i++) {

        }
        //new EndFirmwareUpgradeCommand(this.socket, this.ip, this.port).executeCommand();
        //TODO send end firmware command, wait for ack
    }
}

如您所见,我目前正在使用 q 来使用 promises 并尽量避免回调,但我真的很难找到一种合适的方法来对所有内容进行编码。任何帮助将不胜感激。

在这里使用 q 不会很好地为您服务,这就是为什么:

q 旨在帮助您的图书馆用户以异步方式使用您的图书馆。它是否更多地是作为一种 "nice" 方式让您的用户知道某事何时完成。然后做他们想做的其他事情(在他们的代码中)。

但是在构建协议时,您不想在任何地方都执行 .then,尤其是当您不知道接下来会发生什么时。

用什么? 我真的推荐 rxjs 在这里,它使用可观察对象并且实现每个命令会更加直观。从所有命令中创建队列并以(异步)循环方式 运行 对所有命令创建队列也将非常容易。

你可以看几分钟here。 (不用担心那里所有 Angular 的事情)

希望对您有所帮助。

根据@gilamran 的推荐,这里是一个RxJS 实现。它只等到收到响应才发送下一个请求。它不处理错误要求您刷新队列的情况。

import dgram = require('dgram');
import Rx    = require('rx');   // or use rx.lite if you need something smaller.

let commandQueue = Rx.Subject();
let socket       = dgram.createSocket('udp4');
let ip           = '127.0.0.1';
let port         = '10000';

// let `req` be an object with { ip, port, header, data }.
// sendCommand :: Request -> Observable of Responses
function sendCommand (req) {
    // return this Observable so we can use Rx.Observable.concat later to
    // block while waiting for a response.
    return Rx.Observable.create(obs => {
        let   commandBytes = req.header.concat(req.data);
        const crcBytes     = CRC.CRC16(commandBytes);
        commandBytes       = commandBytes.concat(Math.floor(crcBytes / 0x100), crcBytes % 0x100);
        this.socket.on('message', (data, sender) => {
            // pass this information on for further processing?
            obs.onNext({
                data,
                sender,
            });
            obs.onCompleted();  // close this observable so `.concat` switches to next request.
        });
        socket.send(new Buffer(commandBytes), 0, commandBytes.length, req.port, req.ip)
    });
}

// ok. let's setup the downstream side of our queue.
// 1. take a request, send a packet, return an observable of one response.
// 2. wait for the observable of one response to complete.

let responses = commandQueue
    .concatMap(sendCommand);    // takes command requests and turns them into data/sender responses.

// we must subscribe to pull values through our observable chain.
responses.subscribe();

let cmdStartFirmwareUpgrade = (ip, port) => {
    return {
        ip,
        port,
        header = [0x00, 0x0C, 0x00, 0x19, 0x00, 0x00],
        data   = [0x31, 0x32, 0x33, 0x34, 0x35, 0x36],
    };
};

let cmdDiscovery = (ip, port) => { /* ... */ };
let cmdEndFirmwareUpgrade = (ip, port) => { /* ... */ };

// now let's put some commands in the queue.
commandQueue.onNext(cmdStartFirmwareUpgrade(ip, port));
commandQueue.onNext(cmdDiscovery(ip, port));
commandQueue.onNext(cmdEndFirmwareUpgrade(ip, port));

这个例子当然可以是 DRY'er,但我更喜欢使用 Ramda curried 函数而不是 类,所以我忽略了这一点。

您可以在 https://runkit.com/boxofrox/rxjs-queue 找到此设计的顺序性质的演示。