ZeroMQ如何实现异步任务的并发请求-回复?
How to Perform Concurrent Request-Reply for Asynchronous Tasks with ZeroMQ?
意向
我想允许客户端将任务发送到某个固定地址的服务器。
服务器可能会接受该任务并在未来的任意时间执行它,但在此之前仍可能接受来自其他客户端的请求。
执行任务后,服务器将回复客户端,这可能是 运行 阻塞等待回复。
工作和客户是动态出现的,所以不可能有固定的初始数量。
工作是在非线程安全的上下文中完成的,因此 worker 不能存在于不同的线程中,因此所有工作都应该在单个线程中进行。
实施
以下示例1 不是服务器的完整实现,只是应该能够发生的序列的可编译部分(但实际上是挂起的)。
两个客户端各发送一个整数,服务器接受一个请求,然后是下一个请求,回显回复第一个请求,然后回显回复第二个请求。
目的不是让响应有序,只是允许服务器同时持有多个请求。
这里实际发生的是第二个工作人员挂起等待请求 - 这让我感到困惑,因为 DEALER 套接字应该以循环策略路由传出消息。
#include <unistd.h>
#include <stdio.h>
#include <zmq.h>
#include <sys/wait.h>
int client(int num)
{
void *context, *client;
int buf[1];
context = zmq_ctx_new();
client = zmq_socket(context, ZMQ_REQ);
zmq_connect(client, "tcp://localhost:5559");
*buf = num;
zmq_send(client, buf, 1, 0);
*buf = 0;
zmq_recv(client, buf, 1, 0);
printf("client %d receiving: %d\n", num, *buf);
zmq_close(client);
zmq_ctx_destroy(context);
return 0;
}
void multipart_proxy(void *from, void *to)
{
zmq_msg_t message;
while (1) {
zmq_msg_init(&message);
zmq_msg_recv(&message, from, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, to, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more) break;
}
}
int main(void)
{
int status;
if (fork() == 0) {
client(1);
return(0);
}
if (fork() == 0) {
client(2);
return 0;
}
/* SERVER */
void *context, *frontend, *backend, *worker1, *worker2;
int wbuf1[1], wbuf2[1];
context = zmq_ctx_new();
frontend = zmq_socket(context, ZMQ_ROUTER);
backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "inproc://workers");
worker1 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker1, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf1 = 0;
zmq_recv(worker1, wbuf1, 1, 0);
printf("worker1 receiving: %d\n", *wbuf1);
worker2 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker2, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf2 = 0;
zmq_recv(worker2, wbuf2, 1, 0);
printf("worker2 receiving: %d\n", *wbuf2);
zmq_send(worker1, wbuf1, 1, 0);
multipart_proxy(backend, frontend);
zmq_send(worker2, wbuf2, 1, 0);
multipart_proxy(backend, frontend);
wait(&status);
zmq_close(frontend);
zmq_close(backend);
zmq_close(worker1);
zmq_close(worker2);
zmq_ctx_destroy(context);
return 0;
}
其他选项
我看过 CLIENT and SERVER sockets 并且它们在纸面上似乎有能力,但实际上它们足够新,以至于我拥有的 ZeroMQ 系统版本尚不支持它们。
如果无法在 ZeroMQ 中执行此操作,欢迎提出任何替代建议。
1 基于 ZeroMQ 指南的 Shared Queue 部分。
让我分享一下 ZeroMQ 如何满足上述定义的 Intention
。
我们宁愿使用 ZeroMQ Scalable Formal Communication Pattern Archetypes(因为它们现在是 RTO,而不是我们希望它们处于某个但不确定的指向(可能正在发生的)未来演化状态)。
我们需要毫不犹豫地 use many more ZeroMQ-based connections 在一群 coming/leaving client
-实例和 server
中
例如:
客户端 .connect()
-s 到服务器的 REQ
-socket-address:port
-A 请求“工作”-通过此连接处理票证
客户端.connect()
-s 到服务器的SUB
-套接字-address:port
-B 以收听(如果存在)有关已完成“工作”的已发布公告-服务器票证-准备好为
交付结果
客户端公开另一个 REQ
-socket 以请求已广播的“工作”-ticket 完成公告消息,它刚刚在 SUB
[=87 中听说过=]-socket,以获得最终交付的“工作”-ticket 结果,如果证明自己,通过提供适当的/匹配的 job-ticket-AUTH-key
来证明其有权接收 publicly 宣布的结果可用性,使用这个相同的套接字在客户端正确收到此“工作”-票据结果“在手”后向服务器传递 POSACK 消息
服务器公开 REP
-socket 以根据“工作”-ticket 请求临时响应每个客户端,以这种方式通知“已接受”-job -ticket,还提供 job-ticket-AUTH-key
供稍后提取结果
服务器公开 PUB
-socket 以宣布任何和所有尚未拾取的“完成”-job-tickets
服务器公开另一个 REP
-socket 以接收任何可能的请求传递“工作”-ticket-results 的尝试。在验证交付 job-ticket-AUTH-key
后,服务器决定相应的 REQ
消息是否具有匹配 job-ticket-AUTH-key
以确实交付具有结果的正确消息,或者是否没有发生匹配,在这种情况下消息将携带一些其他有效负载数据(逻辑留待进一步思考,以防止潜在的暴力破解或窃听以及类似的、不太原始的窃取结果的攻击)
客户无需等待结果 live/online and/or 可以承受一定数量的 LoS、L2/L3-errors 或网络风暴压力
客户只需要保留某种 job-ticket-ID
和 job-ticket-AUTH-key
以便以后检索 Server-processes/maintained/auth-ed 结果
服务器将继续侦听新作业
服务器将接受新的工作单并提供私人添加的 job-ticket-AUTH-key
服务器将按原样处理工作单
服务器将维护一个循环缓冲区,其中包含要公布的已完成工作单
服务器将按 public 中的决定在适当的时候重复宣布工作单,这些工作单已准备好供客户端启动的检索
服务器将接受新的检索请求
服务器将验证客户端请求以匹配任何已宣布的 job-ticket-ID
并测试 job-ticket-AUTH-key
是否匹配
服务器将响应匹配/不匹配的 job-ticket-ID
个结果检索请求
服务器仅在检索前的 POSACK-ed AUTH-匹配和 POSACK-消息重新确认传递给客户端时才会从循环缓冲区中删除 job-ticket-ID
意向
我想允许客户端将任务发送到某个固定地址的服务器。 服务器可能会接受该任务并在未来的任意时间执行它,但在此之前仍可能接受来自其他客户端的请求。 执行任务后,服务器将回复客户端,这可能是 运行 阻塞等待回复。 工作和客户是动态出现的,所以不可能有固定的初始数量。 工作是在非线程安全的上下文中完成的,因此 worker 不能存在于不同的线程中,因此所有工作都应该在单个线程中进行。
实施
以下示例1 不是服务器的完整实现,只是应该能够发生的序列的可编译部分(但实际上是挂起的)。 两个客户端各发送一个整数,服务器接受一个请求,然后是下一个请求,回显回复第一个请求,然后回显回复第二个请求。 目的不是让响应有序,只是允许服务器同时持有多个请求。 这里实际发生的是第二个工作人员挂起等待请求 - 这让我感到困惑,因为 DEALER 套接字应该以循环策略路由传出消息。
#include <unistd.h>
#include <stdio.h>
#include <zmq.h>
#include <sys/wait.h>
int client(int num)
{
void *context, *client;
int buf[1];
context = zmq_ctx_new();
client = zmq_socket(context, ZMQ_REQ);
zmq_connect(client, "tcp://localhost:5559");
*buf = num;
zmq_send(client, buf, 1, 0);
*buf = 0;
zmq_recv(client, buf, 1, 0);
printf("client %d receiving: %d\n", num, *buf);
zmq_close(client);
zmq_ctx_destroy(context);
return 0;
}
void multipart_proxy(void *from, void *to)
{
zmq_msg_t message;
while (1) {
zmq_msg_init(&message);
zmq_msg_recv(&message, from, 0);
int more = zmq_msg_more(&message);
zmq_msg_send(&message, to, more ? ZMQ_SNDMORE : 0);
zmq_msg_close(&message);
if (!more) break;
}
}
int main(void)
{
int status;
if (fork() == 0) {
client(1);
return(0);
}
if (fork() == 0) {
client(2);
return 0;
}
/* SERVER */
void *context, *frontend, *backend, *worker1, *worker2;
int wbuf1[1], wbuf2[1];
context = zmq_ctx_new();
frontend = zmq_socket(context, ZMQ_ROUTER);
backend = zmq_socket(context, ZMQ_DEALER);
zmq_bind(frontend, "tcp://*:5559");
zmq_bind(backend, "inproc://workers");
worker1 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker1, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf1 = 0;
zmq_recv(worker1, wbuf1, 1, 0);
printf("worker1 receiving: %d\n", *wbuf1);
worker2 = zmq_socket(context, ZMQ_REP);
zmq_connect(worker2, "inproc://workers");
multipart_proxy(frontend, backend);
*wbuf2 = 0;
zmq_recv(worker2, wbuf2, 1, 0);
printf("worker2 receiving: %d\n", *wbuf2);
zmq_send(worker1, wbuf1, 1, 0);
multipart_proxy(backend, frontend);
zmq_send(worker2, wbuf2, 1, 0);
multipart_proxy(backend, frontend);
wait(&status);
zmq_close(frontend);
zmq_close(backend);
zmq_close(worker1);
zmq_close(worker2);
zmq_ctx_destroy(context);
return 0;
}
其他选项
我看过 CLIENT and SERVER sockets 并且它们在纸面上似乎有能力,但实际上它们足够新,以至于我拥有的 ZeroMQ 系统版本尚不支持它们。 如果无法在 ZeroMQ 中执行此操作,欢迎提出任何替代建议。
1 基于 ZeroMQ 指南的 Shared Queue 部分。
让我分享一下 ZeroMQ 如何满足上述定义的 Intention
。
我们宁愿使用 ZeroMQ Scalable Formal Communication Pattern Archetypes(因为它们现在是 RTO,而不是我们希望它们处于某个但不确定的指向(可能正在发生的)未来演化状态)。
我们需要毫不犹豫地 use many more ZeroMQ-based connections 在一群 coming/leaving client
-实例和 server
例如:
客户端 .connect()
-s 到服务器的 REQ
-socket-address:port
-A 请求“工作”-通过此连接处理票证
客户端.connect()
-s 到服务器的SUB
-套接字-address:port
-B 以收听(如果存在)有关已完成“工作”的已发布公告-服务器票证-准备好为
客户端公开另一个 REQ
-socket 以请求已广播的“工作”-ticket 完成公告消息,它刚刚在 SUB
[=87 中听说过=]-socket,以获得最终交付的“工作”-ticket 结果,如果证明自己,通过提供适当的/匹配的 job-ticket-AUTH-key
来证明其有权接收 publicly 宣布的结果可用性,使用这个相同的套接字在客户端正确收到此“工作”-票据结果“在手”后向服务器传递 POSACK 消息
服务器公开 REP
-socket 以根据“工作”-ticket 请求临时响应每个客户端,以这种方式通知“已接受”-job -ticket,还提供 job-ticket-AUTH-key
供稍后提取结果
服务器公开 PUB
-socket 以宣布任何和所有尚未拾取的“完成”-job-tickets
服务器公开另一个 REP
-socket 以接收任何可能的请求传递“工作”-ticket-results 的尝试。在验证交付 job-ticket-AUTH-key
后,服务器决定相应的 REQ
消息是否具有匹配 job-ticket-AUTH-key
以确实交付具有结果的正确消息,或者是否没有发生匹配,在这种情况下消息将携带一些其他有效负载数据(逻辑留待进一步思考,以防止潜在的暴力破解或窃听以及类似的、不太原始的窃取结果的攻击)
客户无需等待结果 live/online and/or 可以承受一定数量的 LoS、L2/L3-errors 或网络风暴压力
客户只需要保留某种 job-ticket-ID
和 job-ticket-AUTH-key
以便以后检索 Server-processes/maintained/auth-ed 结果
服务器将继续侦听新作业
服务器将接受新的工作单并提供私人添加的 job-ticket-AUTH-key
服务器将按原样处理工作单
服务器将维护一个循环缓冲区,其中包含要公布的已完成工作单
服务器将按 public 中的决定在适当的时候重复宣布工作单,这些工作单已准备好供客户端启动的检索
服务器将接受新的检索请求
服务器将验证客户端请求以匹配任何已宣布的 job-ticket-ID
并测试 job-ticket-AUTH-key
是否匹配
服务器将响应匹配/不匹配的 job-ticket-ID
个结果检索请求
服务器仅在检索前的 POSACK-ed AUTH-匹配和 POSACK-消息重新确认传递给客户端时才会从循环缓冲区中删除 job-ticket-ID