使用 ZeroMQ 在 C 中接收多部分消息
Receive multipart messages in C with ZeroMQ
我正在为将使用 ZeroMQ 套接字传输数据并发布多部分消息的设备编写数据记录系统。
我的程序需要用 C 语言编写。
我确实设法编写了一个 Python 脚本,它可以正常工作。
这看起来像这样:
import zmq
import time
import numpy as np
_HEARTBEAT_PREFIX = b'ray.heartbeat[=10=]'
_RAW_DATA_PREFIX = b'ray.rawdata[=10=]'
context = zmq.Context.instance()
time.sleep(5)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(5)
socket.connect("tcp://localhost:50290")
time.sleep(5)
go = True
while go:
if socket.poll(1000):
zframes = socket.recv_multipart(flags=zmq.NOBLOCK, copy=False)
if (zframes[0].buffer.tobytes() == _HEARTBEAT_PREFIX):
print ('Heartbeart')
elif (zframes[0].buffer.tobytes() == _RAW_DATA_PREFIX):
dataToWrite = np.frombuffer(zframes[1].buffer, dtype='i2')
print ('Raw data: ')
# and so on ...
这是我第一次使用 ZMQ,尝试将此 Python 代码移植到 C 是一个真正的挑战。我现在写的C代码会运行,但是不能正常运行:
#include "zhelpers.h"
int main (int argc, char *argv [])
{
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context,ZMQ_SUB);
int rc = zmq_connect(subscriber,"tcp://localhost:50290");
assert(rc == 0);
rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, "", 0);
assert(rc == 0);
while (1)
{
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
// Process the message frame
int size = zmq_msg_size(&message);
char *string = malloc(size + 1);
memcpy(string, zmq_msg_data(&message), size);
zmq_msg_close (&message);
string[size] = 0;
printf("Message[%d]: %s\n", size, string);
if (!zmq_msg_more (&message))
break; // Last message frame
}
return 0;
}
然而,此代码将提供传入数据的正确大小,但在显示数据时,它显示为不可读字符(符号等..)我将需要,就像我在 Python 中所做的那样,以字符串形式获取传入数据以便进一步处理它。
我发现 Python 中的 numpy 数组被转换为包含 int 的字符串数组,并且它们的值很难为 C 程序解释。
通过处理接收到的数据,我能够提取正确的值:
#include "zhelpers.h"
int main (int argc, char *argv [])
{
int i = 0;
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context,ZMQ_SUB);
int rc = zmq_connect(subscriber,"tcp://localhost:50290");
assert(rc == 0);
char* filter = "ray.";
rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, filter, sizeof(filter));
assert(rc == 0);
int nextFrame = 0;
while (1)
{
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
// Process the message frame
int size = zmq_msg_size(&message);
char *string = malloc(size + 1);
memcpy(string, zmq_msg_data(&message), size);
if (nextFrame)
{
zmq_msg_close (&message);
printf("Raw data: ");
string[size] = 0;
//printf("Data: %s\n", string);
for ( i = 0; i < size; i+=2)
{
printf("%d ", (int16_t) ((string[i] & 0xff) +((string[i+1] & 0xff)<<8)));
}
}
if (strcmp(string, "ray.rawdata") != 0)
{
nextFrame = 0;
break;
}
else
{
nextFrame = 1;
zmq_msg_close (&message);
string[size] = 0;
printf("Prefix: %s\n", string);
if (!zmq_msg_more (&message))
break; // Last message frame
}
}}
return 0;
}
请注意,这仅在从 Python 接收 numpy 数组时出现问题 - 只要它是心跳字符串或其他任何内容,就没有问题。
我正在为将使用 ZeroMQ 套接字传输数据并发布多部分消息的设备编写数据记录系统。
我的程序需要用 C 语言编写。
我确实设法编写了一个 Python 脚本,它可以正常工作。 这看起来像这样:
import zmq
import time
import numpy as np
_HEARTBEAT_PREFIX = b'ray.heartbeat[=10=]'
_RAW_DATA_PREFIX = b'ray.rawdata[=10=]'
context = zmq.Context.instance()
time.sleep(5)
socket = context.socket(zmq.SUB)
socket.setsockopt(zmq.LINGER, 0)
socket.setsockopt(zmq.SUBSCRIBE, b'')
time.sleep(5)
socket.connect("tcp://localhost:50290")
time.sleep(5)
go = True
while go:
if socket.poll(1000):
zframes = socket.recv_multipart(flags=zmq.NOBLOCK, copy=False)
if (zframes[0].buffer.tobytes() == _HEARTBEAT_PREFIX):
print ('Heartbeart')
elif (zframes[0].buffer.tobytes() == _RAW_DATA_PREFIX):
dataToWrite = np.frombuffer(zframes[1].buffer, dtype='i2')
print ('Raw data: ')
# and so on ...
这是我第一次使用 ZMQ,尝试将此 Python 代码移植到 C 是一个真正的挑战。我现在写的C代码会运行,但是不能正常运行:
#include "zhelpers.h"
int main (int argc, char *argv [])
{
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context,ZMQ_SUB);
int rc = zmq_connect(subscriber,"tcp://localhost:50290");
assert(rc == 0);
rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, "", 0);
assert(rc == 0);
while (1)
{
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
// Process the message frame
int size = zmq_msg_size(&message);
char *string = malloc(size + 1);
memcpy(string, zmq_msg_data(&message), size);
zmq_msg_close (&message);
string[size] = 0;
printf("Message[%d]: %s\n", size, string);
if (!zmq_msg_more (&message))
break; // Last message frame
}
return 0;
}
然而,此代码将提供传入数据的正确大小,但在显示数据时,它显示为不可读字符(符号等..)我将需要,就像我在 Python 中所做的那样,以字符串形式获取传入数据以便进一步处理它。
我发现 Python 中的 numpy 数组被转换为包含 int 的字符串数组,并且它们的值很难为 C 程序解释。
通过处理接收到的数据,我能够提取正确的值:
#include "zhelpers.h"
int main (int argc, char *argv [])
{
int i = 0;
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context,ZMQ_SUB);
int rc = zmq_connect(subscriber,"tcp://localhost:50290");
assert(rc == 0);
char* filter = "ray.";
rc = zmq_setsockopt(subscriber,ZMQ_SUBSCRIBE, filter, sizeof(filter));
assert(rc == 0);
int nextFrame = 0;
while (1)
{
zmq_msg_t message;
zmq_msg_init (&message);
zmq_msg_recv (&message, subscriber,0);//, ZMQ_NOBLOCK);
// Process the message frame
int size = zmq_msg_size(&message);
char *string = malloc(size + 1);
memcpy(string, zmq_msg_data(&message), size);
if (nextFrame)
{
zmq_msg_close (&message);
printf("Raw data: ");
string[size] = 0;
//printf("Data: %s\n", string);
for ( i = 0; i < size; i+=2)
{
printf("%d ", (int16_t) ((string[i] & 0xff) +((string[i+1] & 0xff)<<8)));
}
}
if (strcmp(string, "ray.rawdata") != 0)
{
nextFrame = 0;
break;
}
else
{
nextFrame = 1;
zmq_msg_close (&message);
string[size] = 0;
printf("Prefix: %s\n", string);
if (!zmq_msg_more (&message))
break; // Last message frame
}
}}
return 0;
}
请注意,这仅在从 Python 接收 numpy 数组时出现问题 - 只要它是心跳字符串或其他任何内容,就没有问题。