使用 ZeroMQ 流连接到设备失败,但 Linux 套接字工作

Connecting to device with ZeroMQ stream fails, but Linux Sockets work

我有一个串行设备正在通过串行服务器转换为 TCP/IP。这似乎可以正常工作,因为其他应用程序可以通过这种方式连接。

在我的应用程序中,我尝试使用 ZeroMQ ZMQ_STREAM 向设备发送信息请求,并等待回复。

// from it's own thread: threadTx
zmq::socket_t soc_tx(zmq_ctx, ZMQ_STREAM);
soc_tx.connect("tcp://10.10.10.100:4003");

std::string msg("hello");
zmq::message_t zmsg(msg.c_str(), msg.size());
soc_tx.send(zmsg);

然后来自另一个线程:

// from another thread: threadRx
zmq::socket_t soc_rx(zmq_ctx, ZMQ_STREAM);
soc_rx.connect("tcp://10.10.10.100:4003");

zmq::message_t zrecv;
soc_rx.recv(&zrecv);

send 成功返回,我确实收到了一条 0 长度的消息,根据文档表明连接成功(或断开连接)。

但是,我从未收到对最初发送的请求的回复。 soc_rx.recv() 会无限等待。 Wireshark 表示正在发送回复,ZeroMQ 主机从未收到回复。

我也试过在我打开发送的同一个套接字上接收,但没有成功。

// from it's own thread: threadTx
zmq::socket_t soc(zmq_ctx, ZMQ_STREAM);
soc.connect("tcp://10.10.10.100:4003");

std::string msg("hello");
zmq::message_t zmsg(msg.c_str(), msg.size());
soc.send(zmsg);

while (true)
{
  zmq:message_t zrecv;
  soc.recv(&zrecv);
}

但我仍然只收到 0 长度的连接消息,之后没有其他消息。

我认为这可能是配置问题,但同样,其他应用程序也能正常工作。我还能够使用本机 Linux 套接字成功连接并从设备发送和接收消息。

uint16_t const PORT = 4003;
struct sockaddr_in address;
int sock = 0, valread;
struct sockaddr_in serv_addr;
char buffer[1024] = {0};
if ((sock = socket(AF_INET, SOCK_STREAM, 0)) < 0)
{
  return -1;
}

memset(&serv_addr, '0', sizeof(serv_addr));

serv_addr.sin_family = AF_INET;
serv_addr.sin_port = htons(PORT);

// Convert IPv4 and IPv6 address from text to binary form
if(inet_pton(AF_INET, "10.10.10.100", &serv_addr.sin_addr) <= 0)
{
  std::cout << "> invalid address. address not supported" << std::endl;
  return nullptr;
}

if (connect(sock, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
{
  std::cout << "> connection failed" << std::endl;
}

send(sock, out_buf, out_buf_size, 0);

std::this_thread::sleep_for(std::chrono::milliseconds(1000));

valread = recv(sock, buffer, 1024, 0);

这很好用。

为了与我系统中的其他应用程序保持一致,我喜欢将 ZeroMQ 用于所有套接字通信,但是是否存在阻止这种情况的限制?

使用 ZMQ_STREAM 有点棘手,但您应该能够使用单个套接字与外部 TCP 套接字通信。

免责声明,我将展示使用 zmqpp 库的示例,但关键点不是要调用的确切函数,而是如何正确应用流,因此我希望您能够将其转换为纯 ZeroMQ。

看看zmq api docs,原生模式段落

发起与远程的连接,连接后需要获取ZMQ_IDENTITY.

To open a connection to a server, use the zmq_connect call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt call.

zmqpp::socket tcp{context, zmqpp::socket_type::stream};
tcp.connect("tcp://127.0.0.1:12345");
std::string identity = tcp.get<std::string>(zmqpp::socket_option::identity);

什么是身份?二进制数据。来自 docs:

Identity should be at least one byte and at most 255 bytes long. Identities starting with binary zero are reserved for use by ØMQ infrastructure.

获取身份后,可以在socket上接收获取零长度报文——表示连接已经建立。

When a connection is made, a zero-length message will be received by the application.

我发现,当 ZMQ_STREAM api 文档提到零长度消息时,它实际上是指具有 2 个帧的消息,第一个身份,第二个零长度。

现在,正在发送。您的代码中缺少的是将身份作为消息的第一帧传递。 ZeroMQ 引擎将删除它并仅发送第二帧(您不能向消息添加更多帧)。

You must send one identity frame followed by one data frame. The ZMQ_SNDMORE flag is required for identity frames but is ignored on data frames.

zmqpp::message msg;
msg << identity;
msg << "Hello world";
tcp.send(msg);

现在您可以等待来自远程的响应,您将收到正确的回复(消息由 2 帧组成,第一个身份,第二个,远程回答了什么)或断开连接指示(零长度消息)。

When receiving TCP data, a ZMQ_STREAM socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application.

Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.

zmqpp::message msg;
tcp.receive(msg);

std::string r_ident;
msg >> r_ident;
std::string body;
msg >> body;

在@Jakub Piskorz 的指导下,我想出了一个可行的解决方案。您需要将 zmq 标识作为每条消息的第一帧发送到外部设备。但是,在使用 getsockopt() 时获取 zmq 标识不起作用。我不确定这是否是 ZeroMQ 或我正在使用的 CPP 绑定程序的问题。但是,我能够通过分析 ZeroMQ 收到的 0 长度消息来获取 ID。

我注意到的一个重要细节是我在每次连接后收到一个单一的 0 长度消息,它是一个帧(没有标识帧),后面是 0 长度的连接消息,它是两个帧(标识帧然后是0 长度帧)。

// init context and socket
zmq::context_t zctx;
zmq::socket_t zsock(zctx, ZMQ_STREAM);

// connect
zsock.connect(address);

// get socket identity from 0-length connect message
// this logic ignores any 0 length frames that may be received
bool idRecv = false;
zmq::message_t zid;
while (true)
{
  zmq::message_t zrecv;
  zsock.recv(&zrecv);
  if (0 < zrecv.size())
  {
    zid.copy(&zrecv);
    idRecv = true;
  }

  if (false == zrecv.more()
      && true == idRecv)
  {
    break;
  }
}

然后,使用using Multpart Messages你需要先发送identity frame。

// send first frame
zsock.send(zid, ZMQ_SNDMORE);

// send next frame
std::string testMsg = "test";
zmq::message_t zmsgFrameTwo(testMsg.c_str(), testMsg.size());
zsock.send(zmsgFrameTwo);

这允许邮件成功通过。

最后,请确保同时收到身份框架和对您的请求的回复。它们作为单独的框架出现。

// receive identity frame
zmq::message_t zrecvid;
zsock.recv(&zrecvid);

// receive reply frame
zmq::message_t zreply;
zsock.recv(&zreply);