ZeroMQ:使用 ZMQ_STREAM 获取客户端消息

ZeroMQ: Getting client message with ZMQ_STREAM

我想使用 ZeroMQ 的 ZMQ_STREAM 套接字访问从 TCP 对等点接收到的消息。在以下 C 语言示例中,msg 字符串似乎为空:

/* http.c */
#include <stdio.h>
#include <string.h>
#include <zmq.h>

int main(int argc, char* argv[])
    void *ctx    = zmq_ctx_new();
    void *socket = zmq_socket(ctx, ZMQ_STREAM);
    int rc       = zmq_bind(socket, "tcp://");

    uint8_t id[256];
    size_t  id_size = 256;

    char    msg[256];
    size_t  msg_size = 256;

    char http_response[] =
        "HTTP/1.0 200 OK\r\n"
        "Content-Type: text/html\r\n"
        "<h1>Hello, World!</h1>";

    while (1)
        id_size = zmq_recv(socket, id, 256, 0);

        msg_size = zmq_recv(socket, msg, sizeof(msg), 0);
        msg[msg_size] = '[=11=]';
        printf("REQUEST: %s\n", msg);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, http_response, strlen(http_response), ZMQ_SNDMORE);

        zmq_send(socket, id, id_size, ZMQ_SNDMORE);
        zmq_send(socket, 0, 0, ZMQ_SNDMORE);


    return 0;


$ cc -Wall -I/usr/local/include/ -L/usr/local/lib/ -o http http.c -lzmq
$ ./http

我不能使用 CZMQ API,因为我必须依靠 FFI 才能 libzmq

模型代码有一个警告 - 它违背了已发布的 API 因为它从不向任何对等方发送单个消息(因为曾经缺少非 SNDMORE 标记的帧)+更好地 .bind() 到现有的、可访问的 TCP 地址(不是 上的模拟抽象环回接口),否则将没有外部对等方能够初始化通信并将任何请求传递到仅下一个进程raw[] 或无限 while 循环中的任何其他内容:

#define FOREVER 1

int main( int argc, char* argv[] )
    void *ctx    = zmq_ctx_new ();                  assert ( ctx         && "FAILED to instantiate a ZeroMQ Context" );
    void *socket = zmq_socket ( ctx, ZMQ_STREAM );  assert ( socket      && "FAILED to instantiate a STREAM Socket" ) ;
    int   rc = zmq_bind (socket, "tcp://*:8080");   assert ( rc == 0     && "FAILED to .bind on <tcp>://address:port>" );

    size_t  id_size = 256;
    uint8_t id [id_size];

    size_t  raw_size = 256;
    uint8_t raw [raw_size];

    while ( FOREVER ) {                          /* Get HTTP request; ID frame and then PAYLOAD frame ........................*/
        id_size = zmq_recv ( socket, id, 256, 0 );  assert ( id_size > 0 && "FAILED to .recv() an ID frame" );
        do {  
                  raw_size = zmq_recv (socket, raw, 256, 0 ); assert ( raw_size >= 0 && "FAILED to .recv() a PAYLOAD frame" );
        } while ( raw_size == 256 );             /* consumes the whole PAYLOAD frame till the last Byte ......................*/

        char http_response [] =  "HTTP/1.0 200 OK\r\n"
                                 "Content-Type: text/plain\r\n"
                                 "Hello, World!";
        zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards
                           ZMQ_SNDMORE ); //.............. flag == _SNDMORE
        zmq_send ( socket, http_response, //..............         PAYLOAD content loaded
                           strlen ( http_response),
                           0 ); //........................ flag == 0 i.e. LAST FRAME ... == CAN .send() THE MESSAGE AS A WHOLE
        zmq_send ( socket, id, //......................... ID to .send() a PAYLOAD towards
                           ZMQ_SNDMORE ); //.............. flag == _SNDMORE
        zmq_send ( socket, 0, //.......................... 0
                           0, //.......................... 0    == a ZeroSized _STREAM PAYLOAD
                           0 ); //........................ flag == 0 i.e. LAST FRAME ... ZeroSized _STREAM PAYLOAD == .close()
    zmq_close ( socket );
    zmq_ctx_destroy ( ctx );

    return( 0 );

问题只是对 ZeroMQ 消息格式的错误处理。每次客户端连接时,首先发送一条空消息 [id, ](无负载)。以下消息 [id, payload] 在其负载中包含 HTTP GET 请求字符串。

    // Get [id, ] message on client connection.
    id_size = zmq_recv(socket, id, 256, 0);
    printf("ID SIZE.: %zu\n", id_size);

    if (id_size == 0)

    // Empty payload.
    buffer_size = zmq_recv(socket, buffer, 256, 0);

    // Get [id, playload] message.
    id_size = zmq_recv(socket, id, 256, 0);

    if (id_size == 0)

    // Get payload.
    // [...]