ZeroMQ如何实现异步任务的并发请求-回复?

How to Perform Concurrent Request-Reply for Asynchronous Tasks with ZeroMQ?

意向

我想允许客户端将任务发送到某个固定地址的服务器。 服务器可能会接受该任务并在未来的任意时间执行它,但在此之前仍可能接受来自其他客户端的请求。 执行任务后,服务器将回复客户端,这可能是 运行 阻塞等待回复。 工作和客户是动态出现的,所以不可能有固定的初始数量。 工作是在非线程安全的上下文中完成的,因此 worker 不能存在于不同的线程中,因此所有工作都应该在单个线程中进行。

实施

以下示例1 不是服务器的完整实现,只是应该能够发生的序列的可编译部分(但实际上是挂起的)。 两个客户端各发送一个整数,服务器接受一个请求,然后是下一个请求,回显回复第一个请求,然后回显回复第二个请求。 目的不是让响应有序,只是允许服务器同时持有多个请求。 这里实际发生的是第二个工作人员挂起等待请求 - 这让我感到困惑,因为 DEALER 套接字应该以循环策略路由传出消息。

#include <unistd.h>
#include <stdio.h>
#include <zmq.h>
#include <sys/wait.h>

int client(int num)
{
    void *context, *client;
    int buf[1];

    context = zmq_ctx_new();
    client = zmq_socket(context, ZMQ_REQ);
    zmq_connect(client, "tcp://localhost:5559");
    *buf = num;
    zmq_send(client, buf, 1, 0);
    *buf = 0;
    zmq_recv(client, buf, 1, 0);
    printf("client %d receiving: %d\n", num, *buf);
    zmq_close(client);
    zmq_ctx_destroy(context);
    return 0;
}

void multipart_proxy(void *from, void *to)
{
    zmq_msg_t message;

    while (1) {
        zmq_msg_init(&message);
        zmq_msg_recv(&message, from, 0);
        int more = zmq_msg_more(&message);
        zmq_msg_send(&message, to, more ? ZMQ_SNDMORE : 0);
        zmq_msg_close(&message);
        if (!more) break;
    }

}

int main(void)
{
    int status;
    if (fork() == 0) {
        client(1);
        return(0);
    }
    if (fork() == 0) {
        client(2);
        return 0;
    }
    /* SERVER */
    void *context, *frontend, *backend, *worker1, *worker2;
    int wbuf1[1], wbuf2[1];

    context = zmq_ctx_new();
    frontend = zmq_socket(context, ZMQ_ROUTER);
    backend = zmq_socket(context, ZMQ_DEALER);
    zmq_bind(frontend, "tcp://*:5559");
    zmq_bind(backend, "inproc://workers");

    worker1 = zmq_socket(context, ZMQ_REP);
    zmq_connect(worker1, "inproc://workers");
    multipart_proxy(frontend, backend);
    *wbuf1 = 0;
    zmq_recv(worker1, wbuf1, 1, 0);
    printf("worker1 receiving: %d\n", *wbuf1);

    worker2 = zmq_socket(context, ZMQ_REP);
    zmq_connect(worker2, "inproc://workers");
    multipart_proxy(frontend, backend);
    *wbuf2 = 0;
    zmq_recv(worker2, wbuf2, 1, 0);
    printf("worker2 receiving: %d\n", *wbuf2);

    zmq_send(worker1, wbuf1, 1, 0);
    multipart_proxy(backend, frontend);

    zmq_send(worker2, wbuf2, 1, 0);
    multipart_proxy(backend, frontend);

    wait(&status);
    zmq_close(frontend);
    zmq_close(backend);
    zmq_close(worker1);
    zmq_close(worker2);
    zmq_ctx_destroy(context);
    return 0;
}

其他选项

我看过 CLIENT and SERVER sockets 并且它们在纸面上似乎有能力,但实际上它们足够新,以至于我拥有的 ZeroMQ 系统版本尚不支持它们。 如果无法在 ZeroMQ 中执行此操作,欢迎提出任何替代建议。


1 基于 ZeroMQ 指南的 Shared Queue 部分。

让我分享一下 ZeroMQ 如何满足上述定义的 Intention

我们宁愿使用 ZeroMQ Scalable Formal Communication Pattern Archetypes(因为它们现在是 RTO,而不是我们希望它们处于某个但不确定的指向(可能正在发生的)未来演化状态)。

我们需要毫不犹豫地 use many more ZeroMQ-based connections 在一群 coming/leaving client-实例和 server

例如:


客户端 .connect()-s 到服务器的 REQ-socket-address:port-A 请求“工作”-通过此连接处理票证

客户端.connect()-s 到服务器的SUB-套接字-address:port-B 以收听(如果存在)有关已完成“工作”的已发布公告-服务器票证-准备好为

交付结果

客户端公开另一个 REQ-socket 以请求已广播的“工作”-ticket 完成公告消息,它刚刚在 SUB[=87 中听说过=]-socket,以获得最终交付的“工作”-ticket 结果,如果证明自己,通过提供适当的/匹配的 job-ticket-AUTH-key 来证明其有权接收 publicly 宣布的结果可用性,使用这个相同的套接字在客户端正确收到此“工作”-票据结果“在手”后向服务器传递 POSACK 消息


服务器公开 REP-socket 以根据“工作”-ticket 请求临时响应每个客户端,以这种方式通知“已接受”-job -ticket,还提供 job-ticket-AUTH-key 供稍后提取结果

服务器公开 PUB-socket 以宣布任何和所有尚未拾取的“完成”-job-tickets

服务器公开另一个 REP-socket 以接收任何可能的请求传递“工作”-ticket-results 的尝试。在验证交付 job-ticket-AUTH-key 后,服务器决定相应的 REQ 消息是否具有匹配 job-ticket-AUTH-key 以确实交付具有结果的正确消息,或者是否没有发生匹配,在这种情况下消息将携带一些其他有效负载数据(逻辑留待进一步思考,以防止潜在的暴力破解或窃听以及类似的、不太原始的窃取结果的攻击)


客户无需等待结果 live/online and/or 可以承受一定数量的 LoS、L2/L3-errors 或网络风暴压力

客户只需要保留某种 job-ticket-IDjob-ticket-AUTH-key 以便以后检索 Server-processes/maintained/auth-ed 结果

服务器将继续侦听新作业

服务器将接受新的工作单并提供私人添加的 job-ticket-AUTH-key

服务器将按原样处理工作单

服务器将维护一个循环缓冲区,其中包含要公布的已完成工作单

服务器将按 public 中的决定在适当的时候重复宣布工作单,这些工作单已准备好供客户端启动的检索

服务器将接受新的检索请求

服务器将验证客户端请求以匹配任何已宣布的 job-ticket-ID 并测试 job-ticket-AUTH-key 是否匹配

服务器将响应匹配/不匹配的 job-ticket-ID 个结果检索请求

服务器仅在检索前的 POSACK-ed AUTH-匹配和 POSACK-消息重新确认传递给客户端时才会从循环缓冲区中删除 job-ticket-ID