使用进程内套接字的 ZeroMQ PubSub 永远挂起

ZeroMQ PubSub using inproc sockets hangs forever

我正在调整一个 tcp PubSub 示例以将 inproc 与多线程一起使用。它最终会永远挂起。

我的设置

重现问题的源代码:

#include <string.h>
#include <stdio.h>
#include <unistd.h>
#include <thread>
#include "zmq.h"

void hello_pubsub_inproc() {
    void* context = zmq_ctx_new();
    void* publisher = zmq_socket(context, ZMQ_PUB);
    printf("Starting server...\n");
    int pub_conn = zmq_bind(publisher, "inproc://*:4040");

    void* subscriber = zmq_socket(context, ZMQ_SUB);
    printf("Collecting stock information from the server.\n");
    int sub_conn = zmq_connect(subscriber, "inproc://localhost:4040");
    sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);

    std::thread t_pub = std::thread([&]{
        const char* companies[2] = {"Company1", "Company2"};
        int count = 0;
        for(;;) {
            int which_company = count % 2;
            int index = (int)strlen(companies[0]);
            char update[12];
            snprintf(update, sizeof update, "%s",
                     companies[which_company]);
            zmq_msg_t message;
            zmq_msg_init_size(&message, index);
            memcpy(zmq_msg_data(&message), update, index);
            zmq_msg_send(&message, publisher, 0);
            zmq_msg_close(&message);
            count++;
        }
    });

    std::thread t_sub = std::thread([&]{
        int i;
        for(i = 0; i < 10; i++) {
            zmq_msg_t reply;
            zmq_msg_init(&reply);
            zmq_msg_recv(&reply, subscriber, 0);
            int length = (int)zmq_msg_size(&reply);
            char* value = (char*)malloc(length);
            memcpy(value, zmq_msg_data(&reply), length);
            zmq_msg_close(&reply);
            printf("%s\n", value);
            free(value);
        }
    });

    t_pub.join();

    // Give publisher time to set up.
    sleep(1);

    t_sub.join();

    zmq_close(subscriber);
    zmq_close(publisher);
    zmq_ctx_destroy(context);
}

int main (int argc, char const *argv[]) {
    hello_pubsub_inproc();
    return 0;
}

结果

Starting server...
Collecting stock information from the server.

我也试过在加入线程之前添加这个无济于事:

zmq_proxy(publisher, subscriber, NULL);

解决方法:inproc 替换为 tcp 可立即修复。但是 inproc 不应该针对进程中的用例吗?

快速研究告诉我,它不可能是 bindconnect 的顺序,因为该问题已在我的 zmq 版本中修复。

下面的示例以某种方式告诉我我没有缺少共享上下文问题,因为它使用 none:

我从 Signaling Between Threads (PAIR Sockets) 部分的指南中了解到

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.

空订阅是什么意思?

我哪里做错了?

You can use PUB for the sender and SUB for the receiver. This will correctly deliver your messages exactly as you sent them and PUB does not distribute as PUSH or DEALER do. However, you need to configure the subscriber with an empty subscription, which is annoying.


Q : What does it mean by an empty subscription?

这意味着设置(配置)订阅,驱动主题列表消息传递过滤,使用空订阅字符串。

Q : Where am I doing wrong?

这里:

// sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, 0, 0);   // Wrong
   sub_conn = zmq_setsockopt(subscriber, ZMQ_SUBSCRIBE, "",0);  //  Empty string

这里也有疑问,关于使用正确的语法和命名规则:

// int pub_conn = zmq_bind(publisher, "inproc://*:4040");
   int pub_conn = zmq_bind(publisher, "inproc://<aStringWithNameMax256Chars>");

as inproc:// transport-class 不使用任何类型的外部堆栈,而是将 AccessPoint 的 I/O(s) 映射到1+ 个内存位置(无堆栈,I/O-线程不需要传输-class)。

鉴于此,没有像“<address>:<port#>”这样的(此处缺失的)协议被解释,因此类似字符串的文本按原样用于识别消息的内存位置-数据将进入。

因此,“inproc://*:4040”没有展开,而是使用 "literally" 作为命名的 inproc:// 传输-class I/O-内存位置标识为[*:4040](接下来,询问.connect()-方法.connect( "inproc://localhost:4040" ) 将而且必须这样做,在词法上错过准备好的内存位置:["*:4040"] 因为字符串不匹配

所以这应该失败 .connect() - 错误处理可能是沉默的,因为版本 +4.x 没有必要首先遵守历史要求 .bind() (为 inproc:// 创建一个 "known" named-Memory-Location ),然后可以调用 .connect() 使其与 "already existing" named-Memory-location 交叉连接,因此v4.0+ 很可能不会在调用和创建不同的 .bind( "inproc://*:4040" ) 着陆区以及接下来询问不匹配的 .connect( "inproc://localhost:4040" )(没有 "previously prepared" 着陆区时引发任何错误已存在的命名内存位置中的区域。