C++ 和 Python ZeroMQ 4.x PUB/SUB 示例不起作用
C++ and Python ZeroMQ 4.x PUB/SUB example does not work
我只能找到旧的 C++ 源代码示例。不管怎样,我做了我的,基于他们。这是我在 python 的发布者:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
这是 C++ 中的订阅者:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
最初,我尝试了 zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );
但我想如果我不设置这个我应该会收到所有东西,对吧?所以我评论了它。
当我 运行 代码时,我永远看到 "waiting for message..."。
我尝试使用 tcpdump
来监听 TCP 流量。结果是,当发布者打开时,我在 5563
端口上看到很多垃圾,当我关闭发布者时,它们停止了。当我尝试 PUSH/PULL
方案时,我可以在 tcpdump
中看到明文消息。 (我尝试使用 nodejs 进行推送并使用 c++ 进行提取并且它有效)。
我可能做错了什么?
我尝试了 .bind()
、.connect()
、localhost
、127.0.0.1
的不同组合,但它们也不行。
更新:我刚刚读到我必须订阅一些东西,所以我zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );
订阅了所有内容,但我仍然没有收到任何东西
PyZMQ 的版本是 17.0.0.b3 并且有 ZeroMQ 4.2.3
C++ 有 ZeroMQ 4.2.2
更新 2:
同时更新到 4.2.3,也不起作用。
zmq_setsockopt()
的 return 值是多少?
那么你应该使用""
而不是NULL
,它们是不同的。
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
如 API 定义:
Return value
The zmq_setsockopt()
function shall return zero if successful. Otherwise it shall return -1 and set errno
to one of the values defined below.
...
"I guess I should receive everything if I don't set this, right?"
( 任何其他可扩展的正式通信原型模式,如观察到的 PUSH/PULL
,对订阅策略没有任何作用,因此将独立于针对一组主题的订阅匹配处理工作 -过滤器列表。)
第 0 步:首先测试发送方 RTO,如果它 .send()
-s 任何东西:
让我们模拟一个快速的 pythonic 接收器,看看发送器是否确实在通道上发送了任何东西:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
这应该报告 PUB
-发件人实际上通过网络 .send()
-ing 以及实际消息到达间隔时间(在[us]
,很高兴 ZeroMQ 包含了这个用于调试和性能/延迟调整的工具。
如果您看到实时 INF:
-消息实际上在屏幕上滴答作响,请保留它 运行,现在可以继续进行了到下一步。
第一步:接下来测试接收端代码:
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
运行 PUB/SUB 模式(不考虑语言)的正确方法是:
酒吧
socket(zmq.PUB)
bind("tcp://127.0.0.1:5555")
- 编码(通常只对字符串进行编码(),但您也可以对对象进行压缩()或转储(),甚至两者都可以)
encoded_topic = topic.encode()
encoded_msg = msg.encode()
send_multipart([encoded_topic, encoded_msg])
子
socket(zmq.SUB)
setsockopt(zmq.SUBSCRIBE, topic.encode())
connect("tcp://127.0.0.1:5555")
answer = recv_multipart()
- 解码答案
enc_topic, enc_msg = answer
topic = enc_topic.decode()
msg = enc_msg.decode()
通常,步骤 Pub - 2 / Sub - 3(即 bind/connect)和 Pub - 3 / Sub -
5(即 encode/decode 或 dumps/loads)需要相互补充才能使事情正常进行。
是我,问这个问题的人。
我设法通过交换 socket.bind("tcp://*:5563")
到 socket.connect("tcp://dns_address_of_my_dcker_container:5564")
在 python 中开始工作,
并在 C++ 中将 zmq_connect(subscriber, "tcp://localhost:5563")
交换为 zmq_bind(subscriber, "tcp://*:5563")
我在网上找到的示例说我应该对发布者使用 bind
,对订阅者使用 connect
,但这对我来说无论如何都行不通。有人知道为什么吗?
ZeroMQ 文档说明如下:
The zmq_bind() function binds the socket to a local endpoint and then
accepts incoming connections on that endpoint.
The zmq_connect() function connects the socket to an endpoint and then
accepts incoming connections on that endpoint.
我不清楚发生了什么变化,但它奏效了。
我只能找到旧的 C++ 源代码示例。不管怎样,我做了我的,基于他们。这是我在 python 的发布者:
import zmq
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5563")
while True:
msg = "hello"
socket.send_string(msg)
print("sent "+ msg)
sleep(5)
这是 C++ 中的订阅者:
void * ctx = zmq_ctx_new();
void * subscriber = zmq_socket(ctx, ZMQ_SUB);
// zmq_connect(subscriber, "tcp://*:5563");
zmq_connect(subscriber, "tcp://localhost:5563");
// zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "", sizeof(""));
while (true) {
zmq_msg_t msg;
int rc;
rc = zmq_msg_init( & msg);
assert(rc == 0);
std::cout << "waiting for message..." << std::endl;
rc = zmq_msg_recv( & msg, subscriber, 0);
assert(rc == 1);
std::cout << "received: " << (char * ) zmq_msg_data( & msg) << std::endl;
zmq_msg_close( & msg);
}
最初,我尝试了 zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", sizeof("") );
但我想如果我不设置这个我应该会收到所有东西,对吧?所以我评论了它。
当我 运行 代码时,我永远看到 "waiting for message..."。
我尝试使用 tcpdump
来监听 TCP 流量。结果是,当发布者打开时,我在 5563
端口上看到很多垃圾,当我关闭发布者时,它们停止了。当我尝试 PUSH/PULL
方案时,我可以在 tcpdump
中看到明文消息。 (我尝试使用 nodejs 进行推送并使用 c++ 进行提取并且它有效)。
我可能做错了什么?
我尝试了 .bind()
、.connect()
、localhost
、127.0.0.1
的不同组合,但它们也不行。
更新:我刚刚读到我必须订阅一些东西,所以我zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, NULL, 0 );
订阅了所有内容,但我仍然没有收到任何东西
PyZMQ 的版本是 17.0.0.b3 并且有 ZeroMQ 4.2.3
C++ 有 ZeroMQ 4.2.2
更新 2:
同时更新到 4.2.3,也不起作用。
zmq_setsockopt()
的 return 值是多少?
那么你应该使用""
而不是NULL
,它们是不同的。
zmq_setsockopt( subscriber, ZMQ_SUBSCRIBE, "", 0 );
如 API 定义:
Return value
The
zmq_setsockopt()
function shall return zero if successful. Otherwise it shall return -1 and seterrno
to one of the values defined below.
...
"I guess I should receive everything if I don't set this, right?"
( 任何其他可扩展的正式通信原型模式,如观察到的 PUSH/PULL
,对订阅策略没有任何作用,因此将独立于针对一组主题的订阅匹配处理工作 -过滤器列表。)
第 0 步:首先测试发送方 RTO,如果它 .send()
-s 任何东西:
让我们模拟一个快速的 pythonic 接收器,看看发送器是否确实在通道上发送了任何东西:
import zmq
aContext = zmq.Context() # .new Context
aSUB = aContext.socket( zmq.SUB ) # .new Socket
aSUB.connect( "tcp://127.0.0.1:5563" ) # .connect
aSUB.setsockopt( zmq.LINGER, 0 ) # .set ALWAYS!
aSUB.setsockopt( zmq.SUBSCRIBE, "" ) # .set T-filter
MASK = "INF: .recv()-ed this:[{0:}]\n: waited {1: > 7d} [us]"
aClk = zmq.Stopwatch();
while True:
try:
aClk.start(); print MASK.format( aSUB.recv(),
aClk.stop()
)
except ( KeyboardInterrupt, SystemExit ):
pass
break
pass
aSUB.close() # .close ALWAYS!
aContext.term() # .term ALWAYS!
这应该报告 PUB
-发件人实际上通过网络 .send()
-ing 以及实际消息到达间隔时间(在[us]
,很高兴 ZeroMQ 包含了这个用于调试和性能/延迟调整的工具。
如果您看到实时 INF:
-消息实际上在屏幕上滴答作响,请保留它 运行,现在可以继续进行了到下一步。
第一步:接下来测试接收端代码:
#include <zmq.h>
void *aContext = zmq_ctx_new();
void *aSUB = zmq_socket( aContext, ZMQ_SUB ); std::cout << "INF: .. zmq_ctx_new() done" << std::endl;
zmq_connect( aSUB, "tcp://127.0.0.1:5563" ); std::cout << "INF: .. zmq_connect() done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_SUBSCRIBE, "", 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_SUBSCRIBE, ... ) done" << std::endl;
zmq_setsockopt( aSUB, ZMQ_LINGER, 0 ); std::cout << "INF: .. zmq_setsockopt( ZMQ_LINGER, ... ) done" << std::endl;
int rc;
while (true) {
zmq_msg_t msg; /* Create an empty ØMQ message */
rc = zmq_msg_init (&msg); assert (rc == 0 && "EXC: in zmq_msg_init() call" );
std::cout << "INF: .. zmq_msg_init() done" << std::endl;
rc = zmq_msg_recv (&msg, aSUB, 0); assert (rc != -1 && "EXC: in zmq_msg_recv() call" );
std::cout << "INF: .. zmq_msg_recv() done: received [" << (char * ) zmq_msg_data( &msg ) << "]" << std::endl;
zmq_msg_close (&msg); /* Release message */
std::cout << "INF: .. zmq_msg_close()'d" << std::endl;
}
zmq_close( aSUB ); std::cout << "INF: .. aSUB was zmq_close()'d" << std::endl;
zmq_ctx_term( aContext ); std::cout << "INF: .. aCTX was zmq_ctx_term()'d" << std::endl;
运行 PUB/SUB 模式(不考虑语言)的正确方法是:
酒吧
socket(zmq.PUB)
bind("tcp://127.0.0.1:5555")
- 编码(通常只对字符串进行编码(),但您也可以对对象进行压缩()或转储(),甚至两者都可以)
encoded_topic = topic.encode()
encoded_msg = msg.encode()
send_multipart([encoded_topic, encoded_msg])
子
socket(zmq.SUB)
setsockopt(zmq.SUBSCRIBE, topic.encode())
connect("tcp://127.0.0.1:5555")
answer = recv_multipart()
- 解码答案
enc_topic, enc_msg = answer
topic = enc_topic.decode()
msg = enc_msg.decode()
通常,步骤 Pub - 2 / Sub - 3(即 bind/connect)和 Pub - 3 / Sub - 5(即 encode/decode 或 dumps/loads)需要相互补充才能使事情正常进行。
是我,问这个问题的人。
我设法通过交换 socket.bind("tcp://*:5563")
到 socket.connect("tcp://dns_address_of_my_dcker_container:5564")
在 python 中开始工作,
并在 C++ 中将 zmq_connect(subscriber, "tcp://localhost:5563")
交换为 zmq_bind(subscriber, "tcp://*:5563")
我在网上找到的示例说我应该对发布者使用 bind
,对订阅者使用 connect
,但这对我来说无论如何都行不通。有人知道为什么吗?
ZeroMQ 文档说明如下:
The zmq_bind() function binds the socket to a local endpoint and then accepts incoming connections on that endpoint.
The zmq_connect() function connects the socket to an endpoint and then accepts incoming connections on that endpoint.
我不清楚发生了什么变化,但它奏效了。