使用 ZeroMQ 在 C 中接收多部分消息

Receive multipart messages in C with ZeroMQ

我正在为将使用 ZeroMQ 套接字传输数据并发布多部分消息的设备编写数据记录系统。

我的程序需要用 C 语言编写。

我确实设法编写了一个 Python 脚本,它可以正常工作。 这看起来像这样:

import zmq
import time
import numpy as np

_HEARTBEAT_PREFIX =     b'ray.heartbeat[=10=]'  
_RAW_DATA_PREFIX =      b'ray.rawdata[=10=]'

context = zmq.Context.instance()

time.sleep(5)

socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(5)
socket.connect("tcp://localhost:50290")
time.sleep(5)
go = True
while go:

    if socket.poll(1000):

        zframes = socket.recv_multipart(flags=zmq.NOBLOCK, copy=False)

        if (zframes[0].buffer.tobytes() == _HEARTBEAT_PREFIX):
            print ('Heartbeart')
        elif (zframes[0].buffer.tobytes() == _RAW_DATA_PREFIX):
            dataToWrite = np.frombuffer(zframes[1].buffer, dtype='i2')
            print ('Raw data: ')
            # and so on ...

这是我第一次使用 ZMQ,尝试将此 Python 代码移植到 C 是一个真正的挑战。我现在写的C代码会运行,但是不能正常运行:

#include "zhelpers.h"

int main (int argc, char *argv [])
{

  void *context   = zmq_ctx_new();
  void *subscriber = zmq_socket(context,ZMQ_SUB);

  int rc = zmq_connect(subscriber,"tcp://localhost:50290");
  assert(rc == 0);
  rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, "", 0);
  assert(rc == 0); 

  while (1) 
  {
    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
    //  Process the message frame


    int size = zmq_msg_size(&message);
    char *string = malloc(size + 1);
    memcpy(string, zmq_msg_data(&message), size);


    zmq_msg_close (&message);

    string[size] = 0;
    printf("Message[%d]: %s\n", size, string);

    if (!zmq_msg_more (&message))
        break;      //  Last message frame
  }
  return 0;
}

然而,此代码将提供传入数据的正确大小,但在显示数据时,它显示为不可读字符(符号等..)我将需要,就像我在 Python 中所做的那样,以字符串形式获取传入数据以便进一步处理它。

我发现 Python 中的 numpy 数组被转换为包含 int 的字符串数组,并且它们的值很难为 C 程序解释。

通过处理接收到的数据,我能够提取正确的值:

#include "zhelpers.h"

int main (int argc, char *argv [])
{

  int i = 0;

  void *context   = zmq_ctx_new();
  void *subscriber = zmq_socket(context,ZMQ_SUB);

  int rc = zmq_connect(subscriber,"tcp://localhost:50290");
  assert(rc == 0);

  char* filter = "ray.";

  rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, filter, sizeof(filter));
  assert(rc == 0); 
  int nextFrame = 0;

  while (1) 
  {

    zmq_msg_t message;
    zmq_msg_init (&message);
    zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);

    //  Process the message frame

    int size = zmq_msg_size(&message);
    char *string = malloc(size + 1);
    memcpy(string, zmq_msg_data(&message), size);

    if (nextFrame)
    {

      zmq_msg_close (&message);
      printf("Raw data: "); 
      string[size] = 0;
      //printf("Data: %s\n", string);
      for ( i = 0; i < size; i+=2)
      {

        printf("%d ", (int16_t) ((string[i] & 0xff) +((string[i+1] & 0xff)<<8)));

      }


    }
      if (strcmp(string, "ray.rawdata") != 0)
      {
        nextFrame = 0;
        break;
      }
      else 
      {
        nextFrame = 1;

        zmq_msg_close (&message);

        string[size] = 0;
        printf("Prefix: %s\n", string);

        if (!zmq_msg_more (&message))
            break;      //  Last message frame
      }

  }}
  return 0;
}

请注意,这仅在从 Python 接收 numpy 数组时出现问题 - 只要它是心跳字符串或其他任何内容,就没有问题。