在官方 ZeroMQ 多线程示例的修改版本中崩溃

Crash in a modified version of an official ZeroMQ mutithreaded example

我是 zmq 和 cppzmq 的新手。在尝试 运行 官方指南中的多线程示例时:http://zguide.zeromq.org/cpp:mtserver

我的设置

我遇到了一些问题。

问题 1

当运行在指南中使用源代码时,它永远挂起,没有显示任何标准输出输出。

这里是直接从指南中复制的代码。

/*
    Multithreaded Hello World server in C
*/

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        zmq::message_t request;
        socket.recv (&request);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        socket.send (reply);
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (static_cast<void*>(clients),
                static_cast<void*>(workers),
                nullptr);
    return 0;
}

我在worker的while循环中下断点后很快就崩溃了

问题2

注意到编译器提示我替换已弃用的 API 调用,我修改了上面的示例代码以使警告消失。

/*
 Multithreaded Hello World server in C
 */

#include <pthread.h>
#include <unistd.h>
#include <cassert>
#include <string>
#include <iostream>
#include <cstdio>
#include <zmq.hpp>

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        std::array<char, 1024> buf{'[=11=]'};
        zmq::mutable_buffer request(buf.data(), buf.size());
        socket.recv(request, zmq::recv_flags::dontwait);
        std::cout << "Received request: [" << (char*) request.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %X\n", e.num());
        }
    }
    return (NULL);
}

int main ()
{
    //  Prepare our context and sockets
    zmq::context_t context (1);
    zmq::socket_t clients (context, ZMQ_ROUTER);
    clients.bind ("tcp://*:5555");  // who i talk to.
    zmq::socket_t workers (context, ZMQ_DEALER);
    workers.bind ("inproc://workers");

    //  Launch pool of worker threads
    for (int thread_nbr = 0; thread_nbr != 5; thread_nbr++) {
        pthread_t worker;
        pthread_create (&worker, NULL, worker_routine, (void *) &context);
    }
    //  Connect work threads to client threads via a queue
    zmq::proxy (clients, workers);
    return 0;
}

我不是假装对原始错误示例进行字面翻译,但这是我努力使事情编译并且 运行 没有明显的内存错误。

此代码不断给我 try-catch 块中的错误编号 9523DFB156384763,十六进制)。我在官方文档中找不到错误编号的定义,但从 中得知这是本机 ZeroMQ 错误 EFSM:

The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state. This error may occur with socket types that switch between several states, such as ZMQ_REP.

如果有人能指出我哪里做错了,我将不胜感激。

更新

我尝试按照@user3666197 的建议进行投票。但程序仍然挂起。插入任何断点都会导致程序崩溃,从而难以调试。

这是新员工代码

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    zmq::pollitem_t items[1] = { { socket, 0, ZMQ_POLLIN, 0 } };

    while (true) {
        if(zmq::poll(items, 1, -1) < 1) {
            printf("Terminating worker\n");
            break;
        }

        //  Wait for next request from client
        std::array<char, 1024> buf{'[=13=]'};
        socket.recv(zmq::buffer(buf), zmq::recv_flags::none);
        std::cout << "Received request: [" << (char*) buf.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %s\n", e.what());
        }
    }
    return (NULL);
}

欢迎来到零之禅的领域

怀疑 #1: 由于进入分布式有限状态自动机的错误定向状态,代码直接跳入无法解析的活锁:

虽然我一直提倡使用非阻塞 .recv()-s,但上面的代码只是通过使用此步骤直接自杀:

socket.recv( request, zmq::recv_flags::dontwait ); // socket being == ZMQ_REP

扼杀了任何其他未来生命的所有机会,但正是错误 The zmq_send() operation cannot be performed on this socket at the moment due to the socket not being in the appropriate state.
作为
进入 .send() 状态是可能的 当且仅当 之前的 .recv()-ed 交付了一个真实的留言。


最好的下一步:

查看代码,可以在转到 .send() 之前使用 .recv() 的阻塞形式,或者更好的是,使用 { blocking |非阻塞 }-形式的 .poll( { 0 | timeout }, ZMQ_POLLIN ) 在尝试 .recv() 之前继续做其他事情,如果还没有什么可接收的(以避免自我自杀将 dFSA 扔进一个无法解决的问题碰撞,用 printf(" ERROR: %X\n", e.num() ); )

的秒间隔流淹没你的 stdout/stderr

错误处理:

更好地利用 const char *zmq_strerror ( int errnum );int zmq_errno (void);

喂养

问题 1 :

与问题 2 根本原因中的自杀 ::dontwait 标志相反,问题 2 根本原因是,这里第一个 .recv() 的阻塞形式移动了所有工人-随着 .recv()-块继续进行任何进一步的步骤,直到真正的消息到达(从 MCVE 看来它不会,它永远都会),线程进入一个不确定的长的,可能是无限的等待状态,所以你的线程池保持在池范围内的阻塞等待状态,并且在任何消息到达之前不会发生任何事情。


关于 REQ/REP 工作原理的更新:

REQ/REP 可扩展通信模式原型的工作方式就像一对分散的人 - 一个,我们称她为 Mary,询问( Mary .send()-s REQ ),而另一个,比如说鲍勃 REP 监听一个可能无限长的阻塞 .recv()(或采取应有的谨慎,使用 .poll() 有序地定期检查 Mary 是否询问过某事,并继续做他自己的爱好或园艺),一旦 Bob 端收到消息, Bob 可以去 .send() Mary 回复(不是之前,因为他不知道 Mary 在不久的将来什么时候会(或不会)问什么))并且 Mary 不问她下一个 REQ.send()-在 Bob ( REP.send() ) 回复并且 Mary 收到 Bob 的消息 ( REQ.recv() ) 之后随时向 Bob 提问 - 这比现实生活中可能表现出的更公平和对称人们在一个屋檐下:o)

密码?

该代码不是可重现的 MCVE。 main() 创建了五个 Bob(在 inproc:// transport-class 上的某处挂起等待 Mary 的电话),但是 Mary 从来没有打过电话,是吗?没有任何玛丽试图这样做的迹象,她越少(他们,可能是 N:M herd-of-Mary(s):herd-of-5-Bobs 关系的(甚至是动态的)社区)尝试处理来自 5-Bobs 之一的 REP-ly(s)。

坚持不懈,ZeroMQ 花了我一些时间挠头,但在我认真学习零禅之后的几年仍然是天堂花园中一次有益的永恒漫步。本地主机序列代码 IDE 永远无法 "debug" 分布式系统(除非分布式检查器基础设施就位,分布式系统的适当架构 monitor/tracer/debugger 是另一层调试分布式 messaging/signaling 系统之上的分布式 messaging/signaling 层 - 所以不要指望它来自一个微不足道的本地主机串行代码 IDE.

如果仍有疑问,请隔离潜在的麻烦制造者 - 将 inproc:// 替换为 tcp://,如果玩具无法使用 tcp://(可以通过有线线路跟踪消息)它不会使用 inproc:// 内存区技巧。

关于我在 UPDATED 问题中看到的挂起,我终于弄明白是怎么回事了。这是我的错误期望。

我问题中的这个示例代码绝不是一个独立的 service/client 代码:它是一个带有 ZMQ_REP 套接字的纯服务器应用程序。它只是等待任何客户端代码通过 ZMQ_REQ 套接字发送请求。所以我看到的"hang"是完全正常的!

只要我将客户端应用程序连接到它,事情就会立即开始。本章位于指南的中间位置,我只关心多线程,因此我跳过了许多代码示例和消息传递模式,这导致我感到困惑。

代码注释甚至说它是服务器,但我希望看到程序的明确确认。所以公平地说,缺乏视觉提示和编译器弃用警告让我作为新用户质疑示例代码,但代码讲述的故事是有效的。

浪费时间真可惜!但突然间,@user3666197 在他的回答中所说的一切都开始变得有意义了。

为了这个问题的完整性,更新后的服务器线程工作者代码有效:


// server.cpp

void *worker_routine (void *arg)
{
    zmq::context_t *context = (zmq::context_t *) arg;

    zmq::socket_t socket (*context, ZMQ_REP);
    socket.connect ("inproc://workers");

    while (true) {
        //  Wait for next request from client
        std::array<char, 1024> buf{'[=10=]'};
        socket.recv(zmq::buffer(buf), zmq::recv_flags::none);
        std::cout << "Received request: [" << (char*) buf.data() << "]" << std::endl;

        //  Do some 'work'
        sleep (1);

        //  Send reply back to client
        zmq::message_t reply (6);
        memcpy ((void *) reply.data (), "World", 6);
        try {
            socket.send (reply, zmq::send_flags::dontwait);
        }
        catch (zmq::error_t& e) {
            printf("ERROR: %s\n", e.what());
        }
    }
    return (NULL);
}

急需的客户端代码:


// client.cpp

int main (void)
{
    void *context = zmq_ctx_new ();

    //  Socket to talk to server
    void *requester = zmq_socket (context, ZMQ_REQ);
    zmq_connect (requester, "tcp://localhost:5555");

    int request_nbr;
    for (request_nbr = 0; request_nbr != 10; request_nbr++) {
        zmq_send (requester, "Hello", 6, 0);
        char buf[6];
        zmq_recv (requester, buf, 6, 0);
        printf ("Received reply %d [%s]\n", request_nbr, buf);
    }
    zmq_close (requester);
    zmq_ctx_destroy (context);
    return 0;
}

服务器工作者不必手动轮询,因为它已包装到 zmq::proxy