ZMQ Python PUB/SUB 有效,但我的 C++ 订阅者与 Python 发布者无效
ZMQ Python PUB/SUB works but not my C++ Subscriber with Python Publisher
我正在使用 Ubuntu 18.04、Qt 5.12 和 libzmq.so.5.1.5。我有 2 个非常简单的 Python 3.6 脚本,使用 pyzmq 18.0.1,一个实现 ZMQ PUB,一个实现 ZMQ SUB。这些 Python 脚本可以在本地主机环回网络上找到并从一个脚本传输到另一个脚本。我正在尝试在 Qt 5.12 中实现订阅者,当我使用相同的 Python 发布者脚本发送相同的数据时,订阅者总是在接收时阻塞。我在下面粘贴了完整的代码——我怎样才能让 ZMQ 和 Qt 5.12 实际接收到数据?谢谢。
我的 C++ 应用程序的输出看起来很成功:
"Server IP Address determined to be: 127.0.0.1"
"Connecting to: tcp://127.0.0.1:5555"
"Attempted Connect: 0"
"SetSockOpt: 0"
"GetSockOpt: 0"
Done setting up socket
Waiting
Python 3 个公关:
#!/usr/bin/python3
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")
# Allow clients to connect before sending data
time.sleep(3)
while True:
socket.send_pyobj({1:[1,2,3]})
time.sleep(1)
Python 3 分:
#!/usr/bin/python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
# We can connect to several endpoints if we desire, and receive from all.
socket.connect("tcp://127.0.0.1:5555")
# We must declare the socket as of type SUBSCRIBER, and pass a prefix filter.
# Here, the filter is the empty string, wich means we receive all messages.
# We may subscribe to several filters, thus receiving from all.
socket.setsockopt(zmq.SUBSCRIBE, b'')
while True:
message = socket.recv_pyobj()
print(message.get(1)[2])
Qt 5.12 订阅者代码:
#include <QCoreApplication>
// Std includes
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
// Qt
#include <QDebug>
#include <QFile>
// ZeroMQ Includes
#include <zmq.h>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
//QString m_ServerIP = QString("*");
QString m_ServerIP = QString("127.0.0.1");
QString m_ServerPort = QString("5555");
qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);
void* m_Context = zmq_ctx_new();
assert (m_Context);
void* m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
assert (m_Subscriber);
int rc = -1;
unsigned int fd = 0;
do {
const char *filter = std::string("").c_str();
QString ipAndPort = QString("tcp://%1:%2").arg(m_ServerIP).arg(m_ServerPort);
qDebug() << QString("Connecting to: %1").arg(ipAndPort);
rc = zmq_connect(m_Subscriber, ipAndPort.toStdString().c_str());
qDebug() << QString("Attempted Connect: %1").arg(rc);
rc = zmq_setsockopt(m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
qDebug() << QString("SetSockOpt: %1").arg(rc);
size_t fd_size = sizeof(fd);
rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);
qDebug() << QString("GetSockOpt: %1").arg(rc);
}
while ( rc < 0 );
qDebug() << "Done setting up socket";
while ( true) {
zmq_msg_t message;
zmq_msg_init(&message);
qDebug() << "Waiting";
zmq_recvmsg(m_Subscriber, &message, 0);
size_t size = zmq_msg_size (&message);
qDebug() << QString("Message Size: %1").arg(size);
char *string = static_cast<char*>(malloc(size + 1));
memcpy (string, zmq_msg_data(&message), size);
zmq_msg_close (&message);
string [size] = 0;
if (string) {
QByteArray frame = QByteArray::fromBase64(QByteArray(string));
free(string);
qDebug() << QString("Debug RX Frame Size: %1").arg(frame.size());
QFile output("/tmp/abcd.jpeg");
if ( output.open(QIODevice::WriteOnly) ) {
output.write(frame);
output.close();
}
}
}
return a.exec();
}
您有以下错误:
您不需要 QCoreApplication,因为您没有使用需要事件循环的信号、槽、事件等。
您正在使用 zmq 的 C api,C++ 有兼容的 api,要使用它请使用头文件 <zmq.hpp>
。
当你使用send_pyobj()
python时,它使用pickle传输数据,但在C++中没有恢复数据的方法。相反,您应该使用 send_json()
从 python.
发送数据
综合以上,解决方案是:
*.py
#!/usr/bin/python3
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")
# Allow clients to connect before sending data
time.sleep(3)
while True:
socket.send_json({1:[1,2,3]})
time.sleep(1)
main.cpp
#include <QString>
#include <QDebug>
#include <QUrl>
#include <QThread>
#include <zmq.hpp>
int main(void) {
QString m_ServerIP = QString("127.0.0.1");
int m_ServerPort = 5555;
qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);
QUrl url;
url.setScheme("tcp");
url.setHost(m_ServerIP);
url.setPort(m_ServerPort);
zmq::context_t context(1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect(url.toString().toStdString());
subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0);
while(true) {
zmq_msg_t in_msg;
zmq_msg_init(&in_msg);
zmq::message_t message;
subscriber.recv(&message);
qDebug() << QString("Message Size: %1").arg(message.size());
QByteArray ba(static_cast<char*>(message.data()), message.size());
qDebug()<< "message" << ba;
QThread::sleep(1);
}
}
输出:
"Server IP Address determined to be: 127.0.0.1"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
您可以使用 QJsonDocument. I also recommend you to review the following examples in C++ of zmq 解码数据。
如果你使用 socket.send_pyobj({1:[1,2,3]})
你会得到以下 pickle 的结果:
"Server IP Address determined to be: 127.0.0.1"
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
我正在使用 Ubuntu 18.04、Qt 5.12 和 libzmq.so.5.1.5。我有 2 个非常简单的 Python 3.6 脚本,使用 pyzmq 18.0.1,一个实现 ZMQ PUB,一个实现 ZMQ SUB。这些 Python 脚本可以在本地主机环回网络上找到并从一个脚本传输到另一个脚本。我正在尝试在 Qt 5.12 中实现订阅者,当我使用相同的 Python 发布者脚本发送相同的数据时,订阅者总是在接收时阻塞。我在下面粘贴了完整的代码——我怎样才能让 ZMQ 和 Qt 5.12 实际接收到数据?谢谢。
我的 C++ 应用程序的输出看起来很成功:
"Server IP Address determined to be: 127.0.0.1"
"Connecting to: tcp://127.0.0.1:5555"
"Attempted Connect: 0"
"SetSockOpt: 0"
"GetSockOpt: 0"
Done setting up socket
Waiting
Python 3 个公关:
#!/usr/bin/python3
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")
# Allow clients to connect before sending data
time.sleep(3)
while True:
socket.send_pyobj({1:[1,2,3]})
time.sleep(1)
Python 3 分:
#!/usr/bin/python3
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
# We can connect to several endpoints if we desire, and receive from all.
socket.connect("tcp://127.0.0.1:5555")
# We must declare the socket as of type SUBSCRIBER, and pass a prefix filter.
# Here, the filter is the empty string, wich means we receive all messages.
# We may subscribe to several filters, thus receiving from all.
socket.setsockopt(zmq.SUBSCRIBE, b'')
while True:
message = socket.recv_pyobj()
print(message.get(1)[2])
Qt 5.12 订阅者代码:
#include <QCoreApplication>
// Std includes
#include <stdio.h>
#include <unistd.h>
#include <assert.h>
// Qt
#include <QDebug>
#include <QFile>
// ZeroMQ Includes
#include <zmq.h>
int main(int argc, char *argv[])
{
QCoreApplication a(argc, argv);
//QString m_ServerIP = QString("*");
QString m_ServerIP = QString("127.0.0.1");
QString m_ServerPort = QString("5555");
qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);
void* m_Context = zmq_ctx_new();
assert (m_Context);
void* m_Subscriber = zmq_socket (m_Context, ZMQ_SUB);
assert (m_Subscriber);
int rc = -1;
unsigned int fd = 0;
do {
const char *filter = std::string("").c_str();
QString ipAndPort = QString("tcp://%1:%2").arg(m_ServerIP).arg(m_ServerPort);
qDebug() << QString("Connecting to: %1").arg(ipAndPort);
rc = zmq_connect(m_Subscriber, ipAndPort.toStdString().c_str());
qDebug() << QString("Attempted Connect: %1").arg(rc);
rc = zmq_setsockopt(m_Subscriber, ZMQ_SUBSCRIBE,filter, strlen (filter));
qDebug() << QString("SetSockOpt: %1").arg(rc);
size_t fd_size = sizeof(fd);
rc = zmq_getsockopt(m_Subscriber,ZMQ_FD,&fd,&fd_size);
qDebug() << QString("GetSockOpt: %1").arg(rc);
}
while ( rc < 0 );
qDebug() << "Done setting up socket";
while ( true) {
zmq_msg_t message;
zmq_msg_init(&message);
qDebug() << "Waiting";
zmq_recvmsg(m_Subscriber, &message, 0);
size_t size = zmq_msg_size (&message);
qDebug() << QString("Message Size: %1").arg(size);
char *string = static_cast<char*>(malloc(size + 1));
memcpy (string, zmq_msg_data(&message), size);
zmq_msg_close (&message);
string [size] = 0;
if (string) {
QByteArray frame = QByteArray::fromBase64(QByteArray(string));
free(string);
qDebug() << QString("Debug RX Frame Size: %1").arg(frame.size());
QFile output("/tmp/abcd.jpeg");
if ( output.open(QIODevice::WriteOnly) ) {
output.write(frame);
output.close();
}
}
}
return a.exec();
}
您有以下错误:
您不需要 QCoreApplication,因为您没有使用需要事件循环的信号、槽、事件等。
您正在使用 zmq 的 C api,C++ 有兼容的 api,要使用它请使用头文件
<zmq.hpp>
。当你使用
send_pyobj()
python时,它使用pickle传输数据,但在C++中没有恢复数据的方法。相反,您应该使用send_json()
从 python. 发送数据
综合以上,解决方案是:
*.py
#!/usr/bin/python3
import time
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://127.0.0.1:5555")
# Allow clients to connect before sending data
time.sleep(3)
while True:
socket.send_json({1:[1,2,3]})
time.sleep(1)
main.cpp
#include <QString>
#include <QDebug>
#include <QUrl>
#include <QThread>
#include <zmq.hpp>
int main(void) {
QString m_ServerIP = QString("127.0.0.1");
int m_ServerPort = 5555;
qDebug() << QString("Server IP Address determined to be: %1").arg(m_ServerIP);
QUrl url;
url.setScheme("tcp");
url.setHost(m_ServerIP);
url.setPort(m_ServerPort);
zmq::context_t context(1);
zmq::socket_t subscriber (context, ZMQ_SUB);
subscriber.connect(url.toString().toStdString());
subscriber.setsockopt( ZMQ_SUBSCRIBE, "", 0);
while(true) {
zmq_msg_t in_msg;
zmq_msg_init(&in_msg);
zmq::message_t message;
subscriber.recv(&message);
qDebug() << QString("Message Size: %1").arg(message.size());
QByteArray ba(static_cast<char*>(message.data()), message.size());
qDebug()<< "message" << ba;
QThread::sleep(1);
}
}
输出:
"Server IP Address determined to be: 127.0.0.1"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
"Message Size: 13"
message "{\"1\":[1,2,3]}"
您可以使用 QJsonDocument. I also recommend you to review the following examples in C++ of zmq 解码数据。
如果你使用 socket.send_pyobj({1:[1,2,3]})
你会得到以下 pickle 的结果:
"Server IP Address determined to be: 127.0.0.1"
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."
"Message Size: 20"
message "\x80\x03}q\x00K\x01]q\x01(K\x01K\x02K\x03""es."