czmq 异步 send/receive (PUB/SUB)
czmq asynchronous send/receive (PUB/SUB)
我正在使用 czmq 进行进程间通信。
有2个进程:
- 服务器,接收请求和发送回复,还发送事件。
- 客户端,发送请求和接收回复,还监听事件。
我已经使用 REQ/REP 成功实施了 "request/reply" 模式(详情如下)
现在我想实现通知机制。
我希望我的服务器发送它的事件而不关心是否有人收到它们并且无论如何都不会被阻止。
客户端会监听这些事件,但如果它崩溃了,它一定不会对服务器产生任何影响。
我相信 PUB/SUB 是最合适的模式,但如果不是,请不吝赐教。
这是我的实现(从检查和日志中清除):
服务器发布事件
Server::eventIpcPublisher = zsock_new_pub("@ipc:///tmp/events.ipc");
void Server::OnEvent(uint8_t8* eventData, size_t dataSize) {
if (Server::eventIpcPublisher != nullptr) {
int retCode = zsock_send(Server::eventIpcPublisher, "b", eventData, dataSize);
}
客户端在专用线程中收听它们
void Client::RegisterToEvents(const std::function<void(uint8_t*, size_t)>& callback) {
zsock_t* eventIpcSubscriber = zsock_new_sub(">ipc:///tmp/events.ipc", "");
listening = true;
while (listening) {
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_recv(eventIpcSubscriber, "b", &receptionBuffer, &receptionBufferSize);
--> NEVER REACHED <--
if (retCode == 0) {
callback(static_cast<uint8_t*>(receptionBuffer), receptionBufferSize);
}
}
zsock_destroy(&eventIpcSubscriber);
}
没用:
- 服务器发送return代码0,好像一切正常,
- 客户端没有收到任何东西(接收时被阻止)。
非常感谢您的帮助,提前致谢!
克里斯
PS:这是我已经成功实现的REQ/REP(这里不需要帮助,只是为了理解)
客户端发送请求,然后等待响应。
uint8_t* MulticamApi::GetDatabase(size_t& sizeOfData) {
zsock_t* requestSocket = zsock_new_req(">ipc:///tmp/requests.ipc");
if (requestSocket == nullptr)
return nullptr;
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_send(requestSocket, "i", static_cast<int>(IpcComm_GetClipDbRequest));
if (retCode != 0) {
sizeOfData = 0;
return nullptr;
}
retCode = zsock_recv(requestSocket, "b", &receptionBuffer, &receptionBufferSize);
databaseData.reset(new MallocGuard(static_cast<void*>(receptionBuffer)));
sizeOfData = receptionBufferSize;
return static_cast<uint8_t*>(databaseData->Data());
}
服务器中的专用线程侦听请求、处理请求并回复。 (别担心,删除在别处处理)
U32 Server::V_OnProcessing(U32 waitCode) {
protocolIpcWriter = zsock_new_rep("@ipc:///tmp/requests.ipc");
while (running) {
int receptionInt = 0;
int retCode = zsock_recv(protocolIpcWriter, "i", &receptionInt);
if ((retCode == 0) && (receptionInt == static_cast<int>(IpcComm_GetClipDbRequest))) {
GetDatabase();
}
sleep(1);
}
zsock_destroy(&protocolIpcWriter);
return 0;
}
void Server::GetDatabase() {
uint32_t dataSize = 10820 * 340;
uint8_t* data = new uint8_t[dataSize];
uint32_t nbBytesWritten = DbHelper::SaveDbToBuffer(data, dataSize);
int retCode = zsock_send(protocolIpcWriter, "b", data, nbBytesWritten);
}
我知道我的问题很老,但郑重声明,我从 czmq
切换到基础 zmq
api,一切都很顺利。我的一位同事也遇到了 czmq
层的问题并切换到 zmq
来修复它们,所以这绝对是我推荐的。
我正在使用 czmq 进行进程间通信。
有2个进程:
- 服务器,接收请求和发送回复,还发送事件。
- 客户端,发送请求和接收回复,还监听事件。
我已经使用 REQ/REP 成功实施了 "request/reply" 模式(详情如下)
现在我想实现通知机制。 我希望我的服务器发送它的事件而不关心是否有人收到它们并且无论如何都不会被阻止。 客户端会监听这些事件,但如果它崩溃了,它一定不会对服务器产生任何影响。 我相信 PUB/SUB 是最合适的模式,但如果不是,请不吝赐教。
这是我的实现(从检查和日志中清除):
服务器发布事件
Server::eventIpcPublisher = zsock_new_pub("@ipc:///tmp/events.ipc");
void Server::OnEvent(uint8_t8* eventData, size_t dataSize) {
if (Server::eventIpcPublisher != nullptr) {
int retCode = zsock_send(Server::eventIpcPublisher, "b", eventData, dataSize);
}
客户端在专用线程中收听它们
void Client::RegisterToEvents(const std::function<void(uint8_t*, size_t)>& callback) {
zsock_t* eventIpcSubscriber = zsock_new_sub(">ipc:///tmp/events.ipc", "");
listening = true;
while (listening) {
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_recv(eventIpcSubscriber, "b", &receptionBuffer, &receptionBufferSize);
--> NEVER REACHED <--
if (retCode == 0) {
callback(static_cast<uint8_t*>(receptionBuffer), receptionBufferSize);
}
}
zsock_destroy(&eventIpcSubscriber);
}
没用:
- 服务器发送return代码0,好像一切正常,
- 客户端没有收到任何东西(接收时被阻止)。
非常感谢您的帮助,提前致谢!
克里斯
PS:这是我已经成功实现的REQ/REP(这里不需要帮助,只是为了理解)
客户端发送请求,然后等待响应。
uint8_t* MulticamApi::GetDatabase(size_t& sizeOfData) {
zsock_t* requestSocket = zsock_new_req(">ipc:///tmp/requests.ipc");
if (requestSocket == nullptr)
return nullptr;
byte* receptionBuffer;
size_t receptionBufferSize;
int retCode = zsock_send(requestSocket, "i", static_cast<int>(IpcComm_GetClipDbRequest));
if (retCode != 0) {
sizeOfData = 0;
return nullptr;
}
retCode = zsock_recv(requestSocket, "b", &receptionBuffer, &receptionBufferSize);
databaseData.reset(new MallocGuard(static_cast<void*>(receptionBuffer)));
sizeOfData = receptionBufferSize;
return static_cast<uint8_t*>(databaseData->Data());
}
服务器中的专用线程侦听请求、处理请求并回复。 (别担心,删除在别处处理)
U32 Server::V_OnProcessing(U32 waitCode) {
protocolIpcWriter = zsock_new_rep("@ipc:///tmp/requests.ipc");
while (running) {
int receptionInt = 0;
int retCode = zsock_recv(protocolIpcWriter, "i", &receptionInt);
if ((retCode == 0) && (receptionInt == static_cast<int>(IpcComm_GetClipDbRequest))) {
GetDatabase();
}
sleep(1);
}
zsock_destroy(&protocolIpcWriter);
return 0;
}
void Server::GetDatabase() {
uint32_t dataSize = 10820 * 340;
uint8_t* data = new uint8_t[dataSize];
uint32_t nbBytesWritten = DbHelper::SaveDbToBuffer(data, dataSize);
int retCode = zsock_send(protocolIpcWriter, "b", data, nbBytesWritten);
}
我知道我的问题很老,但郑重声明,我从 czmq
切换到基础 zmq
api,一切都很顺利。我的一位同事也遇到了 czmq
层的问题并切换到 zmq
来修复它们,所以这绝对是我推荐的。