未收到 ZMQ 消息
ZMQ messages not being received
如果我遗漏了一些简单的东西,请原谅我,这是我第一次用消息传递做任何事情,我从别人那里继承了这个代码库。
我正在尝试从 IP 为 10.10.10.200 的 windows 机器向 IP 为 10.10.10.15 的 Ubuntu 机器发送消息。
我从Windows机器上运行 TCPView时得到如下结果,这让我怀疑问题出在Ubuntu机器上。如果我没看错,那么我在 windows 机器上的应用程序已经在端口 5556 上创建了一个连接,这是它应该做的。如果我错了,我也会包含 windows 代码。
my_app.exe 5436 TCP MY_COMPUTER 5556 MY_COMPUTER 0 LISTENING
Windows 应用代码:
void
NetworkManager::initializePublisher()
{
globalContext = zmq_ctx_new();
globalPublisher = zmq_socket(globalContext, ZMQ_PUB);
string protocol = "tcp://*:";
string portNumber = PUBLISHING_PORT; //5556
string address = protocol + portNumber;
char *address_ptr = new char[address.size() + 1];
strncpy_s(address_ptr, address.size() + 1, address.c_str(), address.size());
int bind_res = zmq_bind(globalPublisher, address_ptr);
if (bind_res != 0)
{
cerr << "FATAL: couldn't bind to port[" << portNumber << "] and protocol [" << protocol << "]" << endl;
}
cout << " Connection: " << address << endl;
}
void
NetworkManager::publishMessage(MESSAGE msgToSend)
{
// Get the size of the message to be sent
int sizeOfMessageToSend = MSG_MAX_SIZE;//sizeof(msgToSend);
// Copy IDVS message to buffer
char buffToSend[MSG_MAX_SIZE] = "";
// Pack the message id
size_t indexOfId = MSG_ID_SIZE + 1;
size_t indexOfName = MSG_NAME_SIZE + 1;
size_t indexOfdata = MSG_DATABUFFER_SIZE + 1;
memcpy(buffToSend, msgToSend.get_msg_id(), indexOfId - 1);
// Pack the message name
memcpy(buffToSend + indexOfId, msgToSend.get_msg_name(), indexOfName - 1);
// Pack the data buffer
memcpy(buffToSend + indexOfId + indexOfName, msgToSend.get_msg_data(), indexOfdata - 1);
// Send message
int sizeOfSentMessage = zmq_send(globalPublisher, buffToSend, MSG_MAX_SIZE, ZMQ_DONTWAIT);
getSubscriptionConnectionError();
// If message size doesn't match, we have an issue, otherwise, we are good
if (sizeOfSentMessage != sizeOfMessageToSend)
{
int errorCode = zmq_errno();
cerr << "FATAL: couldn't not send message." << endl;
cerr << "ERROR: " << errorCode << endl;
}
}
如果您认为需要的话,我可以包含更多这方面的代码,但错误出现在 Ubuntu 方面,所以我将重点放在这方面。
问题是当我调用 zmq_recv 它 returns -1 并且当我检查 zmq_errno 我得到 EAGAIN(请求非阻塞模式并且没有消息可用片刻。)我还检查了 netstat,但在端口 5556
上没有看到任何内容
首先是连接发布者的函数,然后是获取数据的函数,最后是main。
Ubuntu 边码:
void
*connectoToPublisher()
{
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_SUB);
string protocol = "tcp://";
string ipAddress = PUB_IP; //10.10.10.15
string portNumber = PUB_PORT; // 5556
string address = protocol + ipAddress + ":" + portNumber;
cout << "Address: " << address << endl;
char *address_ptr = new char[address.size() + 1];
strcpy(address_ptr, address.c_str());
// ------ Connect to Publisher ------
bool isConnectionEstablished = false;
int connectionStatus;
while (isConnectionEstablished == false)
{
connectionStatus = zmq_connect(subscriber, address_ptr);
switch (connectionStatus)
{
case 0: //we are good.
cout << "Connection Established!" << endl;
isConnectionEstablished = true;
break;
case -1:
isConnectionEstablished = false;
cout << "Connection Failed!" << endl;
getSubscriptionConnectionError();
cout << "Trying again in 5 seconds..." << endl;
break;
default:
cout << "Hit default connecting to publisher!" << endl;
break;
}
if (isConnectionEstablished == true)
{
break;
}
sleep(5); // Try again
}
// by the time we get here we should have connected to the pub
return subscriber;
}
static void *
getData(void *subscriber)
{
const char *filter = ""; // Get all messages
int subFilterResult = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
// ------ Get in main loop ------
while (1)
{
//get messages from publisher
char bufferReceived[MSG_MAX_SIZE] = "";
size_t expected_messageSize = sizeof(bufferReceived);
int actual_messageSize = zmq_recv(subscriber, bufferReceived, MSG_MAX_SIZE, ZMQ_DONTWAIT);
if (expected_messageSize == actual_messageSize)
{
MESSAGE msg = getMessage(bufferReceived); //Uses memcpy to copy id, name, and data strutct data from buffer into struct of MESSAGE
if (strcmp(msg.get_msg_id(), "IDXY_00000") == 0)
{
DATA = getData(msg); //Uses memcpy to copy data from buffer into struct of DATA
}
} else
{
// Something went wrong
getReceivedError(); //This just calls zmq_errno and cout the error
}
usleep(1);
}
}
int main (int argc, char*argv[])
{
//Doing some stuff...
void *subscriber_socket = connectoToHeadTrackerPublisher();
// Initialize Mux Lock
pthread_mutex_init(&receiverMutex, NULL);
// Initializing some variables...
// Launch Thread to get updates from windows machine
pthread_t publisherThread;
pthread_create(&publisherThread,
NULL, getData, subscriber_socket);
// UI stuff
zmq_close(subscriber_socket);
return 0;
}
如果您不能提供解决方案,那么我将接受将问题确定为解决方案。我的主要问题是我没有消息或网络方面的知识或经验来正确识别问题。通常,如果我知道哪里出了问题,我就能解决它。
好的,这与信令/消息传递框架无关
您的 Ubuntu 代码指示 ZeroMQ Context()
-实例引擎创建一个新的 SUB
-套接字实例,接下来代码坚持让这个套接字尝试 _connect()
(设置 tcp://
transport-class 连接到对等对方)到 "opposite" access-point,位于 Ubuntu localhost:port#
被设置为 10.10.10.15:5556
,而预期的 PUB
端原型 access-point 实际上不在这台 Ubuntu 机器上,而是在另一台 Windows 主机上,IP:port#
其中 10.10.10.200:5556
这似乎是问题所在 root-cause,因此相应地更改它以匹配物理布局,您可能会得到玩具。
如果我遗漏了一些简单的东西,请原谅我,这是我第一次用消息传递做任何事情,我从别人那里继承了这个代码库。
我正在尝试从 IP 为 10.10.10.200 的 windows 机器向 IP 为 10.10.10.15 的 Ubuntu 机器发送消息。
我从Windows机器上运行 TCPView时得到如下结果,这让我怀疑问题出在Ubuntu机器上。如果我没看错,那么我在 windows 机器上的应用程序已经在端口 5556 上创建了一个连接,这是它应该做的。如果我错了,我也会包含 windows 代码。
my_app.exe 5436 TCP MY_COMPUTER 5556 MY_COMPUTER 0 LISTENING
Windows 应用代码:
void
NetworkManager::initializePublisher()
{
globalContext = zmq_ctx_new();
globalPublisher = zmq_socket(globalContext, ZMQ_PUB);
string protocol = "tcp://*:";
string portNumber = PUBLISHING_PORT; //5556
string address = protocol + portNumber;
char *address_ptr = new char[address.size() + 1];
strncpy_s(address_ptr, address.size() + 1, address.c_str(), address.size());
int bind_res = zmq_bind(globalPublisher, address_ptr);
if (bind_res != 0)
{
cerr << "FATAL: couldn't bind to port[" << portNumber << "] and protocol [" << protocol << "]" << endl;
}
cout << " Connection: " << address << endl;
}
void
NetworkManager::publishMessage(MESSAGE msgToSend)
{
// Get the size of the message to be sent
int sizeOfMessageToSend = MSG_MAX_SIZE;//sizeof(msgToSend);
// Copy IDVS message to buffer
char buffToSend[MSG_MAX_SIZE] = "";
// Pack the message id
size_t indexOfId = MSG_ID_SIZE + 1;
size_t indexOfName = MSG_NAME_SIZE + 1;
size_t indexOfdata = MSG_DATABUFFER_SIZE + 1;
memcpy(buffToSend, msgToSend.get_msg_id(), indexOfId - 1);
// Pack the message name
memcpy(buffToSend + indexOfId, msgToSend.get_msg_name(), indexOfName - 1);
// Pack the data buffer
memcpy(buffToSend + indexOfId + indexOfName, msgToSend.get_msg_data(), indexOfdata - 1);
// Send message
int sizeOfSentMessage = zmq_send(globalPublisher, buffToSend, MSG_MAX_SIZE, ZMQ_DONTWAIT);
getSubscriptionConnectionError();
// If message size doesn't match, we have an issue, otherwise, we are good
if (sizeOfSentMessage != sizeOfMessageToSend)
{
int errorCode = zmq_errno();
cerr << "FATAL: couldn't not send message." << endl;
cerr << "ERROR: " << errorCode << endl;
}
}
如果您认为需要的话,我可以包含更多这方面的代码,但错误出现在 Ubuntu 方面,所以我将重点放在这方面。
问题是当我调用 zmq_recv 它 returns -1 并且当我检查 zmq_errno 我得到 EAGAIN(请求非阻塞模式并且没有消息可用片刻。)我还检查了 netstat,但在端口 5556
上没有看到任何内容首先是连接发布者的函数,然后是获取数据的函数,最后是main。 Ubuntu 边码:
void
*connectoToPublisher()
{
void *context = zmq_ctx_new();
void *subscriber = zmq_socket(context, ZMQ_SUB);
string protocol = "tcp://";
string ipAddress = PUB_IP; //10.10.10.15
string portNumber = PUB_PORT; // 5556
string address = protocol + ipAddress + ":" + portNumber;
cout << "Address: " << address << endl;
char *address_ptr = new char[address.size() + 1];
strcpy(address_ptr, address.c_str());
// ------ Connect to Publisher ------
bool isConnectionEstablished = false;
int connectionStatus;
while (isConnectionEstablished == false)
{
connectionStatus = zmq_connect(subscriber, address_ptr);
switch (connectionStatus)
{
case 0: //we are good.
cout << "Connection Established!" << endl;
isConnectionEstablished = true;
break;
case -1:
isConnectionEstablished = false;
cout << "Connection Failed!" << endl;
getSubscriptionConnectionError();
cout << "Trying again in 5 seconds..." << endl;
break;
default:
cout << "Hit default connecting to publisher!" << endl;
break;
}
if (isConnectionEstablished == true)
{
break;
}
sleep(5); // Try again
}
// by the time we get here we should have connected to the pub
return subscriber;
}
static void *
getData(void *subscriber)
{
const char *filter = ""; // Get all messages
int subFilterResult = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, filter, strlen(filter));
// ------ Get in main loop ------
while (1)
{
//get messages from publisher
char bufferReceived[MSG_MAX_SIZE] = "";
size_t expected_messageSize = sizeof(bufferReceived);
int actual_messageSize = zmq_recv(subscriber, bufferReceived, MSG_MAX_SIZE, ZMQ_DONTWAIT);
if (expected_messageSize == actual_messageSize)
{
MESSAGE msg = getMessage(bufferReceived); //Uses memcpy to copy id, name, and data strutct data from buffer into struct of MESSAGE
if (strcmp(msg.get_msg_id(), "IDXY_00000") == 0)
{
DATA = getData(msg); //Uses memcpy to copy data from buffer into struct of DATA
}
} else
{
// Something went wrong
getReceivedError(); //This just calls zmq_errno and cout the error
}
usleep(1);
}
}
int main (int argc, char*argv[])
{
//Doing some stuff...
void *subscriber_socket = connectoToHeadTrackerPublisher();
// Initialize Mux Lock
pthread_mutex_init(&receiverMutex, NULL);
// Initializing some variables...
// Launch Thread to get updates from windows machine
pthread_t publisherThread;
pthread_create(&publisherThread,
NULL, getData, subscriber_socket);
// UI stuff
zmq_close(subscriber_socket);
return 0;
}
如果您不能提供解决方案,那么我将接受将问题确定为解决方案。我的主要问题是我没有消息或网络方面的知识或经验来正确识别问题。通常,如果我知道哪里出了问题,我就能解决它。
好的,这与信令/消息传递框架无关
您的 Ubuntu 代码指示 ZeroMQ Context()
-实例引擎创建一个新的 SUB
-套接字实例,接下来代码坚持让这个套接字尝试 _connect()
(设置 tcp://
transport-class 连接到对等对方)到 "opposite" access-point,位于 Ubuntu localhost:port#
被设置为 10.10.10.15:5556
,而预期的 PUB
端原型 access-point 实际上不在这台 Ubuntu 机器上,而是在另一台 Windows 主机上,IP:port#
其中 10.10.10.200:5556
这似乎是问题所在 root-cause,因此相应地更改它以匹配物理布局,您可能会得到玩具。