ZeroMQ 在 context.close() 中被阻塞。如何在 C++ 中安全地关闭套接字和上下文?
ZeroMQ is blocked in context.close(). How to safely close socket and context in C++?
描述
广播器在 PUB 套接字 "tcp://localhost:5556" 中广播消息,在另一个 PUB 套接字 "tcp://localhost:5557" 中广播 STOP 控制信号。
侦听器接收消息。一听到STOP控制信号就停止并退出。
如 0MQ 终止白皮书所述,要停止 recv() 等待,这是终止底层上下文的标准方法,recv() 将退出并抛出 ETERM 异常。
虽然 recv() 块被释放,但 context.close() 被阻塞。结果还是无法安全退出程序
此外,在上下文之前关闭套接字和将订阅者套接字的延迟值设置为0已经实现。还是被屏蔽了。
- [系统]: Ubuntu 18.04.1(Linux 4.18.0-17-generic)
- [编译器]: gcc-g++ 版本 7.3.0
- [ZeroMQ]:libzmq 4.3.1 + cppzmq 4.3.0
复制代码
这里是重现问题的简单但完整的代码。
// Class Broadcast: A Broadcast object sends a message every 10ms,
// and finally sends a stop control signal
// - start() : start broadcasting.
class Broadcast {
public:
Broadcast():
context_(1),
publisher_(context_, ZMQ_PUB),
controller_(context_, ZMQ_PUB)
{
publisher_.bind("tcp://*:5556");
controller_.bind("tcp://*:5557");
}
void start(){
std::cout << "start Broadcast" << std::endl;
// send data through publisher
const int send_time = 5;
const std::string foo_template("foo_");
for(int i = 0; i < send_time; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string foo = foo_template + std::to_string(i);
zmq::message_t msg(foo.size());
std::memcpy(msg.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg);
}
// send stop control signal throgh controller
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string stop("bar");
zmq::message_t msg(stop.size());
std::memcpy(msg.data(), stop.c_str(), stop.size());
std::cout << "Broadcast Control Signal: " << stop << std::endl;
controller_.send(msg);
std::cout << "end Broadcast" << std::endl;
}
private:
zmq::context_t context_;
zmq::socket_t publisher_;
zmq::socket_t controller_;
}; // class Broadcast
// Class Listener : A Listener object receives messages from Broadcast
// until it receives a stop control signal.
// - start() : start receiving messages;
// - control(): start receiving control signals;
// - stop() : set stop_flag and close zmq sockets and context
class Listener {
public:
Listener():
stop_(false),
context_(1),
subscriber_(context_, ZMQ_SUB),
controller_(context_, ZMQ_SUB)
{
subscriber_.connect("tcp://localhost:5556");
controller_.connect("tcp://localhost:5557");
int linger = 0;
subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
}
void start() {
std::cout << "start Listener" << std::endl;
stop_ = false;
auto control_future = std::async([this]{ control(); });
while(!stop_) {
try {
zmq::message_t msg;
subscriber_.recv(&msg);
std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
std::cout << "Received : " << msg_str << std::endl;
} catch(const zmq::error_t& ex) {
// recv() throws ETERM when the zmq context is destroyed,
// as when AsyncZmqListener::Stop() is called
if(ex.num() != ETERM)
throw;
std::cerr << "subscriber stop with ETERM" << std::endl;
break;
}
}
std::cout << "wait control to join..." << std::endl;
control_future.get();
std::cout << "end Listener" << std::endl;
}
void control() {
while(!stop_) {
zmq::message_t ctrl;
controller_.recv(&ctrl);
std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
std::cout << "Received Control Signal: " << ctrl_str << std::endl;
if(ctrl_str == "bar") {
stop();
}
}
}
void stop() {
stop_ = true;
std::cerr << "closing context..." << std::endl;
subscriber_.close();
controller_.close();
context_.close();
std::cerr << "context is closed." << std::endl;
}
private:
volatile bool stop_;
zmq::context_t context_;
zmq::socket_t subscriber_;
zmq::socket_t controller_;
}; // class Listener
// ## Problem
// Client cannot safely quit since context_.close() blocks the thread.
#include "zmq.hpp"
#include <iostream>
#include <chrono>
#include <future>
int main(int argc, char* argv[]) {
Broadcast broadcast;
Listener listener;
auto broadcast_future = std::async([&]{ broadcast.start(); });
auto listener_future = std::async([&]{ listener.start() ; });
broadcast_future.get();
listener_future.get();
std::cout << "ALL COMPLETED" << std::endl;
return 0;
}
结果
这个问题有可能出现,您可能需要运行几次才能重现一次。有三种可能的结果:正确退出版本、中止版本和被阻止版本。
正确版本
如果程序正确退出,它将显示:
➜ zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
subscriber stop with ETERM
wait control to join...
context is closed.
end Listener
ALL COMPLETED
被屏蔽的版本
➜ zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
context is closed.
缺少"end Listener",shell被屏蔽。
中止版本
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
Assertion failed: pfd.revents & POLLIN (/home/davidwu/src/libzmq/src/signaler.cpp:264)
[1] 16079 abort (core dumped) ./a.out
回溯列为:
#0 __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1 0x00007f90dd99a801 in __GI_abort () at abort.c:79
#2 0x00007f90de57a52e in zmq::zmq_abort(char const*) () from /usr/local/lib/libzmq.so.5
#3 0x00007f90de59ca67 in zmq::signaler_t::wait(int) () from /usr/local/lib/libzmq.so.5
#4 0x00007f90de57ea5c in zmq::mailbox_t::recv(zmq::command_t*, int) () from /usr/local/lib/libzmq.so.5
#5 0x00007f90de59e9c7 in zmq::socket_base_t::process_commands(int, bool) () from /usr/local/lib/libzmq.so.5
#6 0x00007f90de59f726 in zmq::socket_base_t::recv(zmq::msg_t*, int) () from /usr/local/lib/libzmq.so.5
#7 0x00007f90de5c4e8c in zmq_msg_recv () from /usr/local/lib/libzmq.so.5
#8 0x0000561da8eb18f3 in zmq::socket_t::recv(zmq::message_t*, int) ()
#9 0x0000561da8eb2b47 in Listener::start() ()
问题
如何根据外部信号安全退出订阅者?上面的代码有什么问题?或者有没有更好的结构和设计来组织和处理这种情况?
我总是将 zmq 上下文置于主线程的控制之下。在这种情况下,我会做类似
伪代码:
main()
{
context(1) // only one context
job1(context) // pass ref to the one context
job2(context) // pass ref to the one context
job1.join()
job2.join()
context.close()
}
如果你的结构意味着你不能这样做,那么你需要更多地考虑如何处理关机。
您的代码正在一个线程中调用控制代码(在套接字上)
subscriber_.close();
controller_.close();
context_.close();
另一个处理代码(在套接字上)
controller_.recv(&ctrl);
不这样做的两个原因
- zmq 标准套接字不是线程安全的
- 你有一个竞争条件,当 recv 线程阻塞时套接字和上下文可能被破坏。当它解锁时它是
只是会以一种未定义的方式失败,因为 zmq 实际上已经死了(上下文关闭)
这一点。
您应该在同一线程中打开、使用和关闭套接字。在这种情况下,当你得到 ETERM
或 stop_
为真时,(subscriber_ 对象)在 start()
线程中的套接字上调用 close()
感谢@James 的回答,我自己解决了这个问题并通过以下更改更新代码:
- 管理
main()
中的上下文并通过 std::shared_ptr
; 传递
- 关闭创建它的线程中的套接字;
- 广播一条额外的消息以刷新 zmq 队列;
- 使用
std::atomic<bool>
而不是 volatile
来键入标志 stop_
。
这样一来,我们就不用残忍的破坏context
,抓取ETERM
了,这种不自然的方式。所有线程上的所有套接字都可以安全退出。
最后,我post这里的源代码。希望能帮助遇到同样问题的人。
class Broadcast {
public:
Broadcast(std::shared_ptr<zmq::context_t> context):
context_(context),
publisher_(*context_, ZMQ_PUB),
controller_(*context_, ZMQ_PUB)
{
publisher_.bind("tcp://*:5556");
controller_.bind("tcp://*:5557");
}
~Broadcast() {
publisher_.close();
controller_.close();
}
void start(){
std::cout << "start Broadcast" << std::endl;
// send data through publisher
const int send_time = 5;
const std::string foo_template("foo_");
for(int i = 0; i < send_time; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string foo = foo_template + std::to_string(i);
zmq::message_t msg(foo.size());
std::memcpy(msg.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg);
}
// send stop control signal through controller
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string stop("bar");
zmq::message_t msg(stop.size());
std::memcpy(msg.data(), stop.c_str(), stop.size());
std::cout << "Broadcast Control Signal: " << stop << std::endl;
controller_.send(msg);
std::cout << "end Broadcast" << std::endl;
// FIX: post extra message to flush zmq queue
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::string foo = foo_template + "end";
zmq::message_t msg_end(foo.size());
std::memcpy(msg_end.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg_end);
}
private:
std::shared_ptr<zmq::context_t> context_;
zmq::socket_t publisher_;
zmq::socket_t controller_;
}; // class Broadcast
class Client {
public:
Client(std::shared_ptr<zmq::context_t> context):
stop_(false),
context_(context),
subscriber_(*context_, ZMQ_SUB),
controller_(*context_, ZMQ_SUB)
{
int linger = 0;
subscriber_.connect("tcp://localhost:5556");
controller_.connect("tcp://localhost:5557");
subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
}
~Client() {
subscriber_.close();
controller_.close();
}
void start() {
stop_ = false;
std::cout << "start Client" << std::endl;
auto control_future = std::async(std::launch::async, [this]{ control(); });
while(!stop_) {
try {
zmq::message_t msg;
subscriber_.recv(&msg);
std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
std::cout << "Received : " << msg_str << std::endl;
} catch(const zmq::error_t& ex) {
if(ex.num() != ETERM)
throw;
break; // exit while loop
}
}
std::cout << "wait control to join..." << std::endl;
control_future.get();
std::cout << "end Client" << std::endl;
}
void control() {
while(!stop_) {
zmq::message_t ctrl;
controller_.recv(&ctrl);
std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
std::cout << "Received Control Signal: " << ctrl_str << std::endl;
if(ctrl_str == "bar") {
stop_ = true;
}
}
}
private:
std::atomic<bool> stop_;
std::shared_ptr<zmq::context_t> context_;
zmq::socket_t subscriber_;
zmq::socket_t controller_;
}; // class Client
int main(int argc, char* argv[]) {
auto gContext = std::make_shared<zmq::context_t>(1);
Broadcast broadcast(gContext);
Client client(gContext);
auto broadcast_future = std::async([&]{ broadcast.start(); });
auto client_future = std::async([&]{ client.start() ; });
broadcast_future.get();
client_future.get();
std::cout << "ALL COMPLETED" << std::endl;
return 0;
}
编译运行,可以得到正确的结果:
➜ zmq_safe_quit ./a.out
start Client
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
Broadcast: foo_end
Received : foo_end
wait control to join...
end Client
ALL COMPLETED
描述
广播器在 PUB 套接字 "tcp://localhost:5556" 中广播消息,在另一个 PUB 套接字 "tcp://localhost:5557" 中广播 STOP 控制信号。 侦听器接收消息。一听到STOP控制信号就停止并退出。
如 0MQ 终止白皮书所述,要停止 recv() 等待,这是终止底层上下文的标准方法,recv() 将退出并抛出 ETERM 异常。
虽然 recv() 块被释放,但 context.close() 被阻塞。结果还是无法安全退出程序
此外,在上下文之前关闭套接字和将订阅者套接字的延迟值设置为0已经实现。还是被屏蔽了。
- [系统]: Ubuntu 18.04.1(Linux 4.18.0-17-generic)
- [编译器]: gcc-g++ 版本 7.3.0
- [ZeroMQ]:libzmq 4.3.1 + cppzmq 4.3.0
复制代码
这里是重现问题的简单但完整的代码。
// Class Broadcast: A Broadcast object sends a message every 10ms,
// and finally sends a stop control signal
// - start() : start broadcasting.
class Broadcast {
public:
Broadcast():
context_(1),
publisher_(context_, ZMQ_PUB),
controller_(context_, ZMQ_PUB)
{
publisher_.bind("tcp://*:5556");
controller_.bind("tcp://*:5557");
}
void start(){
std::cout << "start Broadcast" << std::endl;
// send data through publisher
const int send_time = 5;
const std::string foo_template("foo_");
for(int i = 0; i < send_time; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string foo = foo_template + std::to_string(i);
zmq::message_t msg(foo.size());
std::memcpy(msg.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg);
}
// send stop control signal throgh controller
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string stop("bar");
zmq::message_t msg(stop.size());
std::memcpy(msg.data(), stop.c_str(), stop.size());
std::cout << "Broadcast Control Signal: " << stop << std::endl;
controller_.send(msg);
std::cout << "end Broadcast" << std::endl;
}
private:
zmq::context_t context_;
zmq::socket_t publisher_;
zmq::socket_t controller_;
}; // class Broadcast
// Class Listener : A Listener object receives messages from Broadcast
// until it receives a stop control signal.
// - start() : start receiving messages;
// - control(): start receiving control signals;
// - stop() : set stop_flag and close zmq sockets and context
class Listener {
public:
Listener():
stop_(false),
context_(1),
subscriber_(context_, ZMQ_SUB),
controller_(context_, ZMQ_SUB)
{
subscriber_.connect("tcp://localhost:5556");
controller_.connect("tcp://localhost:5557");
int linger = 0;
subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
}
void start() {
std::cout << "start Listener" << std::endl;
stop_ = false;
auto control_future = std::async([this]{ control(); });
while(!stop_) {
try {
zmq::message_t msg;
subscriber_.recv(&msg);
std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
std::cout << "Received : " << msg_str << std::endl;
} catch(const zmq::error_t& ex) {
// recv() throws ETERM when the zmq context is destroyed,
// as when AsyncZmqListener::Stop() is called
if(ex.num() != ETERM)
throw;
std::cerr << "subscriber stop with ETERM" << std::endl;
break;
}
}
std::cout << "wait control to join..." << std::endl;
control_future.get();
std::cout << "end Listener" << std::endl;
}
void control() {
while(!stop_) {
zmq::message_t ctrl;
controller_.recv(&ctrl);
std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
std::cout << "Received Control Signal: " << ctrl_str << std::endl;
if(ctrl_str == "bar") {
stop();
}
}
}
void stop() {
stop_ = true;
std::cerr << "closing context..." << std::endl;
subscriber_.close();
controller_.close();
context_.close();
std::cerr << "context is closed." << std::endl;
}
private:
volatile bool stop_;
zmq::context_t context_;
zmq::socket_t subscriber_;
zmq::socket_t controller_;
}; // class Listener
// ## Problem
// Client cannot safely quit since context_.close() blocks the thread.
#include "zmq.hpp"
#include <iostream>
#include <chrono>
#include <future>
int main(int argc, char* argv[]) {
Broadcast broadcast;
Listener listener;
auto broadcast_future = std::async([&]{ broadcast.start(); });
auto listener_future = std::async([&]{ listener.start() ; });
broadcast_future.get();
listener_future.get();
std::cout << "ALL COMPLETED" << std::endl;
return 0;
}
结果
这个问题有可能出现,您可能需要运行几次才能重现一次。有三种可能的结果:正确退出版本、中止版本和被阻止版本。
正确版本
如果程序正确退出,它将显示:
➜ zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
subscriber stop with ETERM
wait control to join...
context is closed.
end Listener
ALL COMPLETED
被屏蔽的版本
➜ zmq_safe_quit ./a.out
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
context is closed.
缺少"end Listener",shell被屏蔽。
中止版本
start Listener
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
closing context...
Assertion failed: pfd.revents & POLLIN (/home/davidwu/src/libzmq/src/signaler.cpp:264)
[1] 16079 abort (core dumped) ./a.out
回溯列为:
#0 __GI_raise (sig=sig@entry=6) at ../sysdeps/unix/sysv/linux/raise.c:51
#1 0x00007f90dd99a801 in __GI_abort () at abort.c:79
#2 0x00007f90de57a52e in zmq::zmq_abort(char const*) () from /usr/local/lib/libzmq.so.5
#3 0x00007f90de59ca67 in zmq::signaler_t::wait(int) () from /usr/local/lib/libzmq.so.5
#4 0x00007f90de57ea5c in zmq::mailbox_t::recv(zmq::command_t*, int) () from /usr/local/lib/libzmq.so.5
#5 0x00007f90de59e9c7 in zmq::socket_base_t::process_commands(int, bool) () from /usr/local/lib/libzmq.so.5
#6 0x00007f90de59f726 in zmq::socket_base_t::recv(zmq::msg_t*, int) () from /usr/local/lib/libzmq.so.5
#7 0x00007f90de5c4e8c in zmq_msg_recv () from /usr/local/lib/libzmq.so.5
#8 0x0000561da8eb18f3 in zmq::socket_t::recv(zmq::message_t*, int) ()
#9 0x0000561da8eb2b47 in Listener::start() ()
问题
如何根据外部信号安全退出订阅者?上面的代码有什么问题?或者有没有更好的结构和设计来组织和处理这种情况?
我总是将 zmq 上下文置于主线程的控制之下。在这种情况下,我会做类似
伪代码:
main()
{
context(1) // only one context
job1(context) // pass ref to the one context
job2(context) // pass ref to the one context
job1.join()
job2.join()
context.close()
}
如果你的结构意味着你不能这样做,那么你需要更多地考虑如何处理关机。
您的代码正在一个线程中调用控制代码(在套接字上)
subscriber_.close();
controller_.close();
context_.close();
另一个处理代码(在套接字上)
controller_.recv(&ctrl);
不这样做的两个原因
- zmq 标准套接字不是线程安全的
- 你有一个竞争条件,当 recv 线程阻塞时套接字和上下文可能被破坏。当它解锁时它是 只是会以一种未定义的方式失败,因为 zmq 实际上已经死了(上下文关闭) 这一点。
您应该在同一线程中打开、使用和关闭套接字。在这种情况下,当你得到 ETERM
或 stop_
为真时,(subscriber_ 对象)在 start()
线程中的套接字上调用 close()
感谢@James 的回答,我自己解决了这个问题并通过以下更改更新代码:
- 管理
main()
中的上下文并通过std::shared_ptr
; 传递
- 关闭创建它的线程中的套接字;
- 广播一条额外的消息以刷新 zmq 队列;
- 使用
std::atomic<bool>
而不是volatile
来键入标志stop_
。
这样一来,我们就不用残忍的破坏context
,抓取ETERM
了,这种不自然的方式。所有线程上的所有套接字都可以安全退出。
最后,我post这里的源代码。希望能帮助遇到同样问题的人。
class Broadcast {
public:
Broadcast(std::shared_ptr<zmq::context_t> context):
context_(context),
publisher_(*context_, ZMQ_PUB),
controller_(*context_, ZMQ_PUB)
{
publisher_.bind("tcp://*:5556");
controller_.bind("tcp://*:5557");
}
~Broadcast() {
publisher_.close();
controller_.close();
}
void start(){
std::cout << "start Broadcast" << std::endl;
// send data through publisher
const int send_time = 5;
const std::string foo_template("foo_");
for(int i = 0; i < send_time; ++i) {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string foo = foo_template + std::to_string(i);
zmq::message_t msg(foo.size());
std::memcpy(msg.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg);
}
// send stop control signal through controller
std::this_thread::sleep_for(std::chrono::milliseconds(10));
std::string stop("bar");
zmq::message_t msg(stop.size());
std::memcpy(msg.data(), stop.c_str(), stop.size());
std::cout << "Broadcast Control Signal: " << stop << std::endl;
controller_.send(msg);
std::cout << "end Broadcast" << std::endl;
// FIX: post extra message to flush zmq queue
std::this_thread::sleep_for(std::chrono::milliseconds(100));
std::string foo = foo_template + "end";
zmq::message_t msg_end(foo.size());
std::memcpy(msg_end.data(), foo.c_str(), foo.size());
std::cout << "Broadcast: " << foo << std::endl;
publisher_.send(msg_end);
}
private:
std::shared_ptr<zmq::context_t> context_;
zmq::socket_t publisher_;
zmq::socket_t controller_;
}; // class Broadcast
class Client {
public:
Client(std::shared_ptr<zmq::context_t> context):
stop_(false),
context_(context),
subscriber_(*context_, ZMQ_SUB),
controller_(*context_, ZMQ_SUB)
{
int linger = 0;
subscriber_.connect("tcp://localhost:5556");
controller_.connect("tcp://localhost:5557");
subscriber_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
controller_.setsockopt(ZMQ_SUBSCRIBE, nullptr, 0);
subscriber_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
controller_.setsockopt(ZMQ_LINGER, &linger, sizeof(linger));
}
~Client() {
subscriber_.close();
controller_.close();
}
void start() {
stop_ = false;
std::cout << "start Client" << std::endl;
auto control_future = std::async(std::launch::async, [this]{ control(); });
while(!stop_) {
try {
zmq::message_t msg;
subscriber_.recv(&msg);
std::string msg_str{static_cast<char*>(msg.data()), msg.size()};
std::cout << "Received : " << msg_str << std::endl;
} catch(const zmq::error_t& ex) {
if(ex.num() != ETERM)
throw;
break; // exit while loop
}
}
std::cout << "wait control to join..." << std::endl;
control_future.get();
std::cout << "end Client" << std::endl;
}
void control() {
while(!stop_) {
zmq::message_t ctrl;
controller_.recv(&ctrl);
std::string ctrl_str{static_cast<char*>(ctrl.data()), ctrl.size()};
std::cout << "Received Control Signal: " << ctrl_str << std::endl;
if(ctrl_str == "bar") {
stop_ = true;
}
}
}
private:
std::atomic<bool> stop_;
std::shared_ptr<zmq::context_t> context_;
zmq::socket_t subscriber_;
zmq::socket_t controller_;
}; // class Client
int main(int argc, char* argv[]) {
auto gContext = std::make_shared<zmq::context_t>(1);
Broadcast broadcast(gContext);
Client client(gContext);
auto broadcast_future = std::async([&]{ broadcast.start(); });
auto client_future = std::async([&]{ client.start() ; });
broadcast_future.get();
client_future.get();
std::cout << "ALL COMPLETED" << std::endl;
return 0;
}
编译运行,可以得到正确的结果:
➜ zmq_safe_quit ./a.out
start Client
start Broadcast
Broadcast: foo_0
Received : foo_0
Broadcast: foo_1
Received : foo_1
Broadcast: foo_2
Received : foo_2
Broadcast: foo_3
Received : foo_3
Broadcast: foo_4
Received : foo_4
Broadcast Control Signal: bar
end Broadcast
Received Control Signal: bar
Broadcast: foo_end
Received : foo_end
wait control to join...
end Client
ALL COMPLETED