ZeroMQ PUB-SUB 通信:SUB 什么也没收到
ZeroMQ PUB-SUB communication : SUB receives nothing
我正在尝试在 ZeroMQ 中进行 PUB-SUB 通信,其中 PUB 在 C++ 中,而 SUB 在 python 中。我正在使用 python 3.8、ZeroMQ 4.3.2、pyzmq 18.1.1 和 cppzmq 4.5.0
PUB:
int main()
{
Sleep(10000);
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int zipcode, temperature, relhumidity;
while (true) {
// Get values (first supposed to be random)
zipcode = 10001;
temperature = 27;
relhumidity = 61;
// Send message to the subscriber
zmq::message_t message(20);
snprintf((char *)message.data(), 20, "%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message, zmq::send_flags::none);
std::fprintf(stderr, "[INFO] Sent data: %i, %i, %i \n", zipcode, temperature, relhumidity);
if (fValue && j >= fValue) {
break;
}
j++;
}
}
潜艇:
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
# Process 5 updates
for update_nbr in range(3):
string = socket.recv_string()
zipcode, temperature, relhumidity = string.split()
print("Received data : %s , %d , %d" % (zip_filter, temperature, relhumidity))
但是我无法让它工作,因为 SUB 在 string = socket.recv_string()
上永远等待,而 PUB 发送消息没有错误。实际上,它 returns 已发送消息的长度。
备注:
- Sleep是试用版,可以在PUB发送前启动SUB。但是,如果我删除它并先启动 SUB,我会有相同的行为。
- 如果我执行以下操作,它会打印
none
:
Err = socket.connect("tcp://localhost:5556")
print(Err)
我是 ZeroMQ 的新手,我真的不知道从哪里开始解决这个问题。有什么想法吗?
Q : Any idea?
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看 ,然后再深入了解更多细节
Step -1: repair your c++ side code ,以便正确定义 j
和 fValue
并验证其在 break
条件下的正确用法,以避免立即 break
-退出发送循环。
第0步:在python端,设置ZeroMQ SUB
先订阅任何主题,使用 ""
- 作为主题过滤器显式设置的字符串。 如果有效:您的问题与正确的主题设置无关,以便实际进行主题过滤。 如果不是:您似乎也有视线 "visibility" 问题(当不在同一 localhost
上时可能会发生这种情况(是的,这有时也会发生在这里))。
我没有快速检查你的工作的方法,但我可以给你一些基本的提示,可能会让你解决问题。
我建议你开始移除接收器端的过滤器。为此,您必须将 setsockopt
替换为 socket.setsockopt_string(zmq.SUBSCRIBE, "")
此外,总是在接收方,我建议你只打印你收到的内容,而不对数据做任何假设,所以我会用一个简单的 print(socket.recv_string())
(甚至只是 print(socket.recv())
.
您的简单(调试)接收器类似于:
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "")
data = socket.recv_string()
print(data)
通过这种方式,您可以了解真正通过套接字发送的内容(如果有的话)。
如果收到了一些东西,但它不是您所期望的,那么您可以在发件人方面工作。例如,您可以查看 socket.send()
的 ZMQ_SNDMORE
标志,它允许您创建多部分消息。 (有关 "topic filtering" 的工作原理,请参阅文档 here)。
如果你还需要帮助,明天我可能会更彻底地看一下。
感谢大家的回答。我发现这是一个经典的 "slow joiner" 问题。我在循环之前移动了 Sleep(10) 以查看发生了什么并且它起作用了。
我的第一条消息中不清楚的地方(因为过度清理了我的代码以在此处说清楚)是 fValue 要么设置为一个数字以在循环中进行一定数量的交互,要么设置为 0 以循环无限地。所以,我将它设置为 3 只是为了发送少量消息,但是 PUB 没有足够的时间连接到 SUB 以便 SUB 看到消息。
我正在尝试在 ZeroMQ 中进行 PUB-SUB 通信,其中 PUB 在 C++ 中,而 SUB 在 python 中。我正在使用 python 3.8、ZeroMQ 4.3.2、pyzmq 18.1.1 和 cppzmq 4.5.0
PUB:
int main()
{
Sleep(10000);
zmq::context_t context(1);
zmq::socket_t publisher(context, ZMQ_PUB);
publisher.bind("tcp://*:5556");
int zipcode, temperature, relhumidity;
while (true) {
// Get values (first supposed to be random)
zipcode = 10001;
temperature = 27;
relhumidity = 61;
// Send message to the subscriber
zmq::message_t message(20);
snprintf((char *)message.data(), 20, "%05d %d %d", zipcode, temperature, relhumidity);
publisher.send(message, zmq::send_flags::none);
std::fprintf(stderr, "[INFO] Sent data: %i, %i, %i \n", zipcode, temperature, relhumidity);
if (fValue && j >= fValue) {
break;
}
j++;
}
}
潜艇:
import sys
import zmq
# Socket to talk to server
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
# Subscribe to zipcode, default is NYC, 10001
zip_filter = "10001"
# Python 2 - ascii bytes to unicode str
if isinstance(zip_filter, bytes):
zip_filter = zip_filter.decode('ascii')
socket.setsockopt_string(zmq.SUBSCRIBE, zip_filter)
# Process 5 updates
for update_nbr in range(3):
string = socket.recv_string()
zipcode, temperature, relhumidity = string.split()
print("Received data : %s , %d , %d" % (zip_filter, temperature, relhumidity))
但是我无法让它工作,因为 SUB 在 string = socket.recv_string()
上永远等待,而 PUB 发送消息没有错误。实际上,它 returns 已发送消息的长度。
备注:
- Sleep是试用版,可以在PUB发送前启动SUB。但是,如果我删除它并先启动 SUB,我会有相同的行为。
- 如果我执行以下操作,它会打印
none
:
Err = socket.connect("tcp://localhost:5556")
print(Err)
我是 ZeroMQ 的新手,我真的不知道从哪里开始解决这个问题。有什么想法吗?
Q : Any idea?
如果你从未使用过 ZeroMQ,
你可能会喜欢先看看
Step -1: repair your c++ side code ,以便正确定义 j
和 fValue
并验证其在 break
条件下的正确用法,以避免立即 break
-退出发送循环。
第0步:在python端,设置ZeroMQ SUB
先订阅任何主题,使用 ""
- 作为主题过滤器显式设置的字符串。 如果有效:您的问题与正确的主题设置无关,以便实际进行主题过滤。 如果不是:您似乎也有视线 "visibility" 问题(当不在同一 localhost
上时可能会发生这种情况(是的,这有时也会发生在这里))。
我没有快速检查你的工作的方法,但我可以给你一些基本的提示,可能会让你解决问题。
我建议你开始移除接收器端的过滤器。为此,您必须将
setsockopt
替换为socket.setsockopt_string(zmq.SUBSCRIBE, "")
此外,总是在接收方,我建议你只打印你收到的内容,而不对数据做任何假设,所以我会用一个简单的
print(socket.recv_string())
(甚至只是print(socket.recv())
.
您的简单(调试)接收器类似于:
import zmq
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt_string(zmq.SUBSCRIBE, "")
data = socket.recv_string()
print(data)
通过这种方式,您可以了解真正通过套接字发送的内容(如果有的话)。
如果收到了一些东西,但它不是您所期望的,那么您可以在发件人方面工作。例如,您可以查看 socket.send()
的 ZMQ_SNDMORE
标志,它允许您创建多部分消息。 (有关 "topic filtering" 的工作原理,请参阅文档 here)。
如果你还需要帮助,明天我可能会更彻底地看一下。
感谢大家的回答。我发现这是一个经典的 "slow joiner" 问题。我在循环之前移动了 Sleep(10) 以查看发生了什么并且它起作用了。
我的第一条消息中不清楚的地方(因为过度清理了我的代码以在此处说清楚)是 fValue 要么设置为一个数字以在循环中进行一定数量的交互,要么设置为 0 以循环无限地。所以,我将它设置为 3 只是为了发送少量消息,但是 PUB 没有足够的时间连接到 SUB 以便 SUB 看到消息。