Protobuf zeromq/zwssock 到 JSMQ/ProtoBuf.js 消息未完全接收

Protobuf over zeromq/zwssock to JSMQ/ProtoBuf.js messages are not fully recieved

我正在通过 zwssock (czmq)(它是 zcmq 的 websocket 扩展)将 Protobuf 编码数据发送到 JSMQ.js,然后对 protobuf 数据进行解码。

  1. OS: Windows
  2. 浏览器:Chrome40.0.2
  3. ØMQ: 4.0.4
  4. czmq: 2.2.0.

存在以下问题:

收到的数据不包含已发送的所有字节。 事实上,经过检查后发现所有字节都收到了第一个 0 字节。

真实例子:

数据发送:

10,4,78,77,69,65,-110,3,19,8,101,-86,6,14,8,1,16,-40,-126,-27,-14,-12,-87,-61,2, -16,1,1,-110,3,93,8,100,80,0,-94,6,86,34,73,36,71,80,82,77,67,44,49,48,51,50,51, 49,46,54,48,49,44,86,44,53,50,48,53,46,54,52,50,54,55,48,44,78,44,48,48,54,49,57 ,46,54,49,51,57,52,55,44,69,44,52,51,46,50,44,51,54,46,52,44,50,51,48,49,49,53,4 4,44,44,83,42,54,68,32,74,9,69,80,83,71,58,52,51,50,54,-30,3,97,10,4,78,77,69,65 ,18,73,36,71,80,82,77,67,44,49,48,51,50,51,49,46,54,48,49,44,86,44,53,50,48,53,4 6,54,52,50,54,55,48,44,78,44,48,48,54,49,57,46,54,49,51,57,52,55,44,69,44,52,51, 46,50,44,51,54,46,52,44,50,51,48,49,49,53,44,44,44,83,42,54,68,32,82,14,10,5,8,2 ,-80,1,2,-94,6,4,8,8,16,6

收到的数据:

0, 10, 4, 78, 77, 69, 65, 146, 3, 19, 8, 101, 170, 6, 14, 8, 1, 16, 137, 255, 156, 213, 244, 169, 195, 2, 240, 1, 1, 146, 3, 93, 8, 100, 80]

如您所见,80 之后的字节丢失了,序列现在以 0 字节开始。我已经用手动创建的数据 -- char* -- 进行了测试,每次都会出现同样的问题。

以下两个函数直接取自JSQM.js,由websocket在接收到数据时调用。

 function onmessage(ev) {
        if (ev.data instanceof Blob) {
            var arrayBuffer;
            var fileReader = new FileReader();
            fileReader.onload = function () {
                processFrame(this.result);
            };
            fileReader.readAsArrayBuffer(ev.data);
        } else if (ev.data instanceof ArrayBuffer) {
            processFrame(ev.data);
        }
        // Other message type are not supported and will just be dropped
    };

    function processFrame(frame) {
        var view = new Uint8Array(frame);
        var more = view[0];

        if (incomingMessage == null) {
            incomingMessage = new JSMQ.Message();
        }

        incomingMessage.addBuffer(frame);

        // last message
        if (more == 0) {
            if (that.onMessage != null) {
                that.onMessage(that, incomingMessage);
            }

            incomingMessage = null;
        }
    }

在onmessage/processFrame中接收到的数据已经不包含完整的字节序列。如您所见,接收到的字节序列以 0 开头,与 [more == 0] 守卫匹配。

我无法让 wireshark 嗅探发送的包,检查 如果字节未正确发送。

一个解决方案是使用 bytestuffing 从而删除所有 0 字节。 但我一定是哪里弄错了吗?

根据要求:

在内部我们使用zeromq的c++库,但是因为websockets 目前在 c 版本上有扩展,我们需要将其转换为 c 样式消息。如前所述,数据已被填充。

void CZeroMQConnection::Send(zmq::message_t& message)
{

    zmsg_t* msg = zmsg_new();

    std::vector<unsigned char> rpl;
    std::copy_n(reinterpret_cast<char*>(message.data()), message.size(),std::back_inserter(rpl));

    // Temporary stuffing on Websockets
    char stuffChar = 1;
    char invalidChar = 0;

    std::vector<unsigned char> stuffed;
    for (auto it = rpl.begin(); it != rpl.end(); ++it)
    {
        if (*it == invalidChar || *it == stuffChar)
        {
            stuffed.push_back(stuffChar);
            stuffed.push_back((*it) + 1);
        }
            else
                stuffed.push_back(*it);
        }

    // As mentioned added extra 0 byte, preventing superfluos data
    stuffed.push_back(0);
    zmsg_push(msg, _id);
    zmsg_addstr(msg, reinterpret_cast<char*> (&stuffed[0]));
    zwssock_send(_socket, &msg);
}

目前还没有双向数据流,这将在不久的将来出现。

我尝试重现 chrome (40.0.2) 的问题但没有成功,我收到了包括零在内的完整消息。

c代码:

static char *listen_on = "tcp://127.0.0.1:86";

int main(int argc, char **argv)
{   
    zctx_t *ctx;
    zwssock_t *sock;

    char *l =  argc > 1 ? argv[1] : listen_on;

    ctx = zctx_new();
    sock = zwssock_new_router(ctx);

    zwssock_bind(sock, l);

    zmsg_t* msg;
    zframe_t *id;

    while (!zctx_interrupted)
    {       
        msg = zwssock_recv(sock);

        if (!msg)
            break;

        // first message is the routing id
        id = zmsg_pop(msg);

        while (zmsg_size(msg) != 0)
        {
            char * str = zmsg_popstr(msg);

            printf("%s\n", str);

            free(str);
        }

        zmsg_destroy(&msg);

        msg = zmsg_new();

        zmsg_push(msg, id);

        char buf[226] = { 10, 4, 78, 77, 69, 65, -110, 3, 19, 8, 101, -86, 6, 14, 8, 1, 16, -40, -126, -27, -14, -12,
            -87, -61, 2, -16, 1, 1, -110, 3, 93, 8, 100, 80, 0, -94, 6, 86, 34, 73, 36, 71, 80, 82,
            77, 67, 44, 49, 48, 51, 50, 51, 49, 46, 54, 48, 49, 44, 86, 44, 53, 50, 48, 53, 46, 54, 52, 50,
            54, 55, 48, 44, 78, 44, 48, 48, 54, 49, 57, 46, 54, 49, 51, 57, 52, 55, 44, 69, 44, 52, 51, 46,
            50, 44, 51, 54, 46, 52, 44, 50, 51, 48, 49, 49, 53, 4, 4, 44, 44, 83, 42, 54, 68, 32, 74, 9, 69, 80,
            83, 71, 58, 52, 51, 50, 54, -30, 3, 97, 10, 4, 78, 77, 69, 65, 18, 73, 36, 71, 80, 82, 77, 67, 44, 49,
            48, 51, 50, 51, 49, 46, 54, 48, 49, 44, 86, 44, 53, 50, 48, 53, 4, 6, 54, 52, 50, 54, 55, 48, 44, 78,
            44, 48, 48, 54, 49, 57, 46, 54, 49, 51, 57, 52, 55, 44, 69, 44, 52, 51, 46, 50, 44, 51, 54, 46, 52,
            44, 50, 51, 48, 49, 49, 53, 44, 44, 44, 83, 42, 54, 68, 32, 82, 14, 10, 5, 8, 2, -80, 1, 2, -94, 6, 4, 8, 8, 16, 6 };

        zmsg_addmem(msg, buf, 226);

        zwssock_send(sock, &msg);
    }

    zwssock_destroy(&sock);
    zctx_destroy(&ctx); 
}

和以下 javascript:

var dealer = new JSMQ.Dealer();
dealer.connect("ws://localhost:86");

// we must wait for the dealer to be connected before we can send messages, any messages we are trying to send
// while the dealer is not connected will be dropped
dealer.sendReady = function() {
    var message = new JSMQ.Message();
    message.addString("Hello");

    dealer.send(message);
};

dealer.onMessage = function (message) {
    // the response from the server
    var buffer = message.popBuffer();
    console.log(buffer.length);
    console.log(buffer);
};

function send() {
    var message = new JSMQ.Message();
    message.addString(document.getElementById("messageTextBox").value);

    dealer.send(message);
} 

谢谢somdoron,

您的 post 使用:

zmsg_addmem(msg, buf, 226);

我在使用时:

zmsg_addstr(msg, reinterpret_cast<char*> (&stuffed[0]));

这可能会将输入解释为 C 字符串。

这解决了问题,非常感谢!