ZeroMQ - CZMQ - Majordomo 模式 - 快速发送时丢失请求
ZeroMQ - CZMQ - Majordomo pattern - losing requests when sent FAST
我已经在这个问题上苦苦思索了一段时间,希望你们中的一位能给我指出正确的方向。
问题是,无论何时将请求快速传递给代理,并非所有请求都会到达(单个)工作人员。
如果我在请求之间引入一些延迟(请参阅客户端代码中的 sleep(1)),一切正常,但显然,这是不可接受的
为了重现我遇到的问题,我创建了我的代码的这个简化版本:
客户:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "majordomo_library.h"
#define SAFEFREE(x) \
if (x) { \
free(x); \
x = NULL; \
}
int main() {
char service[] = "bb-test";
char endpoint[] = "ipc:///tmp/bbtest.ipc";
mdp_client_t **clients = NULL;
zmsg_t *request = NULL;
char request_str[128];
char *cmd = NULL, *reply = NULL;
int i = 0, loops = 10;
/* Create array of ptr for <loop> clients */
clients = calloc(loops, sizeof(mdp_client_t *));
assert(clients != NULL);
/* create <loops> client sessions and send a request on each */
for (i = 0; i < loops; i++) {
/* create a new MDP client session */
clients[i] = mdp_client_new(endpoint);
if (!clients[i]) {
fprintf(stderr, "Error %s\r\n", mdp_client_reason(clients[i]));
exit(-1);
}
/* create new request message */
request = zmsg_new();
assert(request != NULL);
memset(request_str, 0, 128);
sprintf(request_str, "Request %d", i);
zmsg_addstr(request, request_str);
/* send the message as an MDP client request */
if(mdp_client_request(clients[i], service, &request) ==0 ) {
fprintf(stdout, "%s sent\r\n", request_str);
} else {
fprintf(stderr, "%s NOT SENT (%s)\r\n", request_str, mdp_client_reason(clients[i]));
}
zmsg_destroy(&request);
/* If I add sleep time here, so the worker can process the
* request and send the reply back, it works just fine.
* As soon as a drop all requests to the broker, the worker gets
* stuck at zsock_recv() stuck after processing only one, or a
* subset of the requests )
* */
//sleep(1);
}
/* collect the replies */
for (i = 0; i < loops; i++) {
/* create a message pipe to read the replies */
zsock_t *client_sock = mdp_client_msgpipe(clients[i]);
assert(client_sock);
/* set receive timeout (60s) */
zsock_set_rcvtimeo(client_sock, 10000);
/* get the message as "ss" (string and string) into cmd and reply*/
if (zsock_recv(client_sock, "ss", &cmd, &reply) == 0) {
fprintf(stdout, "Received: %s: %s\r\n", cmd, reply);
} else {
fprintf(stderr, "Failed to receive reply %s\r\n",
mdp_client_reason(clients[i]));
}
/* close the message pipe */
zmq_close(client_sock);
/* destroy the client session */
if (clients[i]) {
mdp_client_destroy(&clients[i]);
}
SAFEFREE(cmd);
SAFEFREE(reply);
}
return 0;
}
这是我如何启动默认值 mdp_broker:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "mdp_broker.h"
int main() {
int rc = 0;
zactor_t *broker = zactor_new(mdp_broker, "test_MDP-broker");
assert(broker != NULL);
zstr_send(broker, "VERBOSE");
zstr_sendx(broker, "BIND", "ipc:///tmp/bbtest.ipc", NULL);
getchar();
zactor_destroy(&broker);
exit(0);
}
最后,这是工人:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "mdp_worker.h"
#define SAFEFREE(x) \
if (x) { \
free(x); \
(x) = NULL; \
}
int main() {
char service[] = "bb-test";
char endpoint[] = "ipc:///tmp/bbtest.ipc";
mdp_worker_t *worker_session = NULL;
zsock_t *worker_sock = NULL;
zframe_t *address = NULL;
char *cmd = NULL;
char *request = NULL;
char *reply = NULL;
int rc = 0;
/* create new worker and register the service with the broker */
worker_session = mdp_worker_new(endpoint, service);
assert(worker_session != NULL);
mdp_worker_set_verbose(worker_session);
worker_sock = mdp_worker_msgpipe(worker_session);
assert(worker_sock != NULL);
while (1) {
rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);
if (rc != 0) {
fprintf(stderr, "Failed to receive message: %s\r\n",
mdp_worker_reason(worker_session));
continue;
}
fprintf(stdout, "Got message \"%s\"\r\n", request);
reply = calloc(strlen(request) + 10, sizeof(char));
assert(reply != NULL);
snprintf(reply, strlen(request) + 10, "%s - reply", request);
/* Create reply message */
zmsg_t *msg_response = zmsg_new();
assert(msg_response != NULL);
/* Send */
rc = zmsg_addstr(msg_response, reply);
assert(rc == 0);
rc = mdp_worker_send_final(worker_session, &address, &msg_response);
fprintf(rc == 0 ? stdout : stderr, "Sending reply (\"%s\") was %s\r\n\r\n",
reply, rc == 0 ? "successful" : "UNSUCCESSFUL");
zmsg_destroy(&msg_response);
SAFEFREE(cmd)
SAFEFREE(request)
SAFEFREE(reply)
}
mdp_worker_destroy(&worker_session);
exit(0);
}
结果睡眠(1)
客户:
D: 20-04-10 20:59:35 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 20:59:36 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 20:59:37 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 20:59:38 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 20:59:39 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 20:59:40 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 20:59:41 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 20:59:42 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 20:59:43 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 20:59:44 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply
Received: FINAL: Request 4 - reply
Received: FINAL: Request 5 - reply
Received: FINAL: Request 6 - reply
Received: FINAL: Request 7 - reply
Received: FINAL: Request 8 - reply
Received: FINAL: Request 9 - reply
Process finished with exit code 0
工人:
D: 20-04-10 20:59:32 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful
Got message "Request 1"
Sending reply ("Request 1 - reply") was successful
Got message "Request 2"
Sending reply ("Request 2 - reply") was successful
Got message "Request 3"
Sending reply ("Request 3 - reply") was successful
Got message "Request 4"
Sending reply ("Request 4 - reply") was successful
Got message "Request 5"
Sending reply ("Request 5 - reply") was successful
Got message "Request 6"
Sending reply ("Request 6 - reply") was successful
Got message "Request 7"
Sending reply ("Request 7 - reply") was successful
Got message "Request 8"
Sending reply ("Request 8 - reply") was successful
Got message "Request 9"
Sending reply ("Request 9 - reply") was successful
和毫不拖延
客户:
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply
工人:
D: 20-04-10 21:03:40 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful
Got message "Request 1"
Sending reply ("Request 1 - reply") was successful
Got message "Request 2"
Sending reply ("Request 2 - reply") was successful
Got message "Request 3"
Sending reply ("Request 3 - reply") was successful
工作人员阻塞
rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);
代理详细输出告诉我所有请求都发送到代理,但(在这种情况下)只存在 3 WORKER_FINAL 条消息。成功处理的请求数量各不相同,实际上,它并不总是只有 3 个,但随着请求数量的增加,它会在某个时候中断。
有什么想法吗?任何人??漂亮吗???
我发现了这个问题。它与mdp_broker有关。
自 2020 年 2 月 29 日星期六 10:20:52 提交 603a304fb674733bd00c0314761242da013a327f 起,代理不会调度排队的请求,除非有 "worker_ready" 或 "client_request" 事件。
因此,如果有请求添加到队列而没有工作人员可用,则接收到的请求总数和派发的请求总数将不同,并且一些请求将保留在队列中未处理直到超时。
MDP-Broker 还需要 check/dispatch 尽快 as/as 任何请求,只要队列中有请求并且工作人员在等待 - 无论传入 handle_request 事件如何。
所以我在 mdp_broker.c 中的 handle_final() 函数末尾添加了对 s_dispatch() 的调用。它会导致代理检查挂起的请求并发送调度它们,每次在处理完先前的请求后将工作人员重新添加到等待工作人员列表时。
因此,mdp_broker.c 中的 handle_final() 应如下所示:
static void
handle_worker_final (client_t *self)
{
mdp_msg_t *msg = self->message;
mdp_msg_t *client_msg = mdp_msg_new();
// Set routing id, messageid, service, body
zframe_t *address = mdp_msg_address(msg);
mdp_msg_set_routing_id(client_msg, address);
mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
const char *service_name = self->service_name;
mdp_msg_set_service(client_msg, service_name);
zmsg_t *body = mdp_msg_get_body(msg);
mdp_msg_set_body(client_msg, &body);
mdp_msg_send(client_msg, self->server->router);
// Add the worker back to the list of waiting workers.
char *identity = zframe_strhex(mdp_msg_routing_id(msg));
worker_t *worker =
(worker_t *) zhash_lookup(self->server->workers, identity);
assert(worker);
zlist_append(self->server->waiting, worker);
service_t *service = (service_t *) zhash_lookup(self->server->services,
worker->service->name);
assert(service);
zlist_append(service->waiting, worker);
zstr_free(&identity);
mdp_msg_destroy(&client_msg);
s_service_dispatch(service);
}
修复已提交给 zmq/majordomo 团队。如果提交后,我将再次更新此 post。
约尔格
我已经在这个问题上苦苦思索了一段时间,希望你们中的一位能给我指出正确的方向。
问题是,无论何时将请求快速传递给代理,并非所有请求都会到达(单个)工作人员。
如果我在请求之间引入一些延迟(请参阅客户端代码中的 sleep(1)),一切正常,但显然,这是不可接受的
为了重现我遇到的问题,我创建了我的代码的这个简化版本:
客户:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "majordomo_library.h"
#define SAFEFREE(x) \
if (x) { \
free(x); \
x = NULL; \
}
int main() {
char service[] = "bb-test";
char endpoint[] = "ipc:///tmp/bbtest.ipc";
mdp_client_t **clients = NULL;
zmsg_t *request = NULL;
char request_str[128];
char *cmd = NULL, *reply = NULL;
int i = 0, loops = 10;
/* Create array of ptr for <loop> clients */
clients = calloc(loops, sizeof(mdp_client_t *));
assert(clients != NULL);
/* create <loops> client sessions and send a request on each */
for (i = 0; i < loops; i++) {
/* create a new MDP client session */
clients[i] = mdp_client_new(endpoint);
if (!clients[i]) {
fprintf(stderr, "Error %s\r\n", mdp_client_reason(clients[i]));
exit(-1);
}
/* create new request message */
request = zmsg_new();
assert(request != NULL);
memset(request_str, 0, 128);
sprintf(request_str, "Request %d", i);
zmsg_addstr(request, request_str);
/* send the message as an MDP client request */
if(mdp_client_request(clients[i], service, &request) ==0 ) {
fprintf(stdout, "%s sent\r\n", request_str);
} else {
fprintf(stderr, "%s NOT SENT (%s)\r\n", request_str, mdp_client_reason(clients[i]));
}
zmsg_destroy(&request);
/* If I add sleep time here, so the worker can process the
* request and send the reply back, it works just fine.
* As soon as a drop all requests to the broker, the worker gets
* stuck at zsock_recv() stuck after processing only one, or a
* subset of the requests )
* */
//sleep(1);
}
/* collect the replies */
for (i = 0; i < loops; i++) {
/* create a message pipe to read the replies */
zsock_t *client_sock = mdp_client_msgpipe(clients[i]);
assert(client_sock);
/* set receive timeout (60s) */
zsock_set_rcvtimeo(client_sock, 10000);
/* get the message as "ss" (string and string) into cmd and reply*/
if (zsock_recv(client_sock, "ss", &cmd, &reply) == 0) {
fprintf(stdout, "Received: %s: %s\r\n", cmd, reply);
} else {
fprintf(stderr, "Failed to receive reply %s\r\n",
mdp_client_reason(clients[i]));
}
/* close the message pipe */
zmq_close(client_sock);
/* destroy the client session */
if (clients[i]) {
mdp_client_destroy(&clients[i]);
}
SAFEFREE(cmd);
SAFEFREE(reply);
}
return 0;
}
这是我如何启动默认值 mdp_broker:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "mdp_broker.h"
int main() {
int rc = 0;
zactor_t *broker = zactor_new(mdp_broker, "test_MDP-broker");
assert(broker != NULL);
zstr_send(broker, "VERBOSE");
zstr_sendx(broker, "BIND", "ipc:///tmp/bbtest.ipc", NULL);
getchar();
zactor_destroy(&broker);
exit(0);
}
最后,这是工人:
#include <stdio.h>
#include <stdlib.h>
#include "czmq.h"
#include "mdp_worker.h"
#define SAFEFREE(x) \
if (x) { \
free(x); \
(x) = NULL; \
}
int main() {
char service[] = "bb-test";
char endpoint[] = "ipc:///tmp/bbtest.ipc";
mdp_worker_t *worker_session = NULL;
zsock_t *worker_sock = NULL;
zframe_t *address = NULL;
char *cmd = NULL;
char *request = NULL;
char *reply = NULL;
int rc = 0;
/* create new worker and register the service with the broker */
worker_session = mdp_worker_new(endpoint, service);
assert(worker_session != NULL);
mdp_worker_set_verbose(worker_session);
worker_sock = mdp_worker_msgpipe(worker_session);
assert(worker_sock != NULL);
while (1) {
rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);
if (rc != 0) {
fprintf(stderr, "Failed to receive message: %s\r\n",
mdp_worker_reason(worker_session));
continue;
}
fprintf(stdout, "Got message \"%s\"\r\n", request);
reply = calloc(strlen(request) + 10, sizeof(char));
assert(reply != NULL);
snprintf(reply, strlen(request) + 10, "%s - reply", request);
/* Create reply message */
zmsg_t *msg_response = zmsg_new();
assert(msg_response != NULL);
/* Send */
rc = zmsg_addstr(msg_response, reply);
assert(rc == 0);
rc = mdp_worker_send_final(worker_session, &address, &msg_response);
fprintf(rc == 0 ? stdout : stderr, "Sending reply (\"%s\") was %s\r\n\r\n",
reply, rc == 0 ? "successful" : "UNSUCCESSFUL");
zmsg_destroy(&msg_response);
SAFEFREE(cmd)
SAFEFREE(request)
SAFEFREE(reply)
}
mdp_worker_destroy(&worker_session);
exit(0);
}
结果睡眠(1)
客户:
D: 20-04-10 20:59:35 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 20:59:36 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 20:59:37 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 20:59:38 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 20:59:39 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 20:59:40 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 20:59:41 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 20:59:42 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 20:59:43 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 20:59:44 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply
Received: FINAL: Request 4 - reply
Received: FINAL: Request 5 - reply
Received: FINAL: Request 6 - reply
Received: FINAL: Request 7 - reply
Received: FINAL: Request 8 - reply
Received: FINAL: Request 9 - reply
Process finished with exit code 0
工人:
D: 20-04-10 20:59:32 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful
Got message "Request 1"
Sending reply ("Request 1 - reply") was successful
Got message "Request 2"
Sending reply ("Request 2 - reply") was successful
Got message "Request 3"
Sending reply ("Request 3 - reply") was successful
Got message "Request 4"
Sending reply ("Request 4 - reply") was successful
Got message "Request 5"
Sending reply ("Request 5 - reply") was successful
Got message "Request 6"
Sending reply ("Request 6 - reply") was successful
Got message "Request 7"
Sending reply ("Request 7 - reply") was successful
Got message "Request 8"
Sending reply ("Request 8 - reply") was successful
Got message "Request 9"
Sending reply ("Request 9 - reply") was successful
和毫不拖延
客户:
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 0 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 1 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 2 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 3 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 4 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 5 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 6 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 7 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 8 sent
D: 20-04-10 21:03:45 connected to ipc:///tmp/bbtest.ipc
Request 9 sent
Received: FINAL: Request 0 - reply
Received: FINAL: Request 1 - reply
Received: FINAL: Request 2 - reply
Received: FINAL: Request 3 - reply
工人:
D: 20-04-10 21:03:40 connected to ipc:///tmp/bbtest.ipc
Got message "Request 0"
Sending reply ("Request 0 - reply") was successful
Got message "Request 1"
Sending reply ("Request 1 - reply") was successful
Got message "Request 2"
Sending reply ("Request 2 - reply") was successful
Got message "Request 3"
Sending reply ("Request 3 - reply") was successful
工作人员阻塞
rc = zsock_recv(worker_sock, "sfs", &cmd, &address, &request);
代理详细输出告诉我所有请求都发送到代理,但(在这种情况下)只存在 3 WORKER_FINAL 条消息。成功处理的请求数量各不相同,实际上,它并不总是只有 3 个,但随着请求数量的增加,它会在某个时候中断。
有什么想法吗?任何人??漂亮吗???
我发现了这个问题。它与mdp_broker有关。 自 2020 年 2 月 29 日星期六 10:20:52 提交 603a304fb674733bd00c0314761242da013a327f 起,代理不会调度排队的请求,除非有 "worker_ready" 或 "client_request" 事件。 因此,如果有请求添加到队列而没有工作人员可用,则接收到的请求总数和派发的请求总数将不同,并且一些请求将保留在队列中未处理直到超时。
MDP-Broker 还需要 check/dispatch 尽快 as/as 任何请求,只要队列中有请求并且工作人员在等待 - 无论传入 handle_request 事件如何。
所以我在 mdp_broker.c 中的 handle_final() 函数末尾添加了对 s_dispatch() 的调用。它会导致代理检查挂起的请求并发送调度它们,每次在处理完先前的请求后将工作人员重新添加到等待工作人员列表时。
因此,mdp_broker.c 中的handle_final() 应如下所示:
static void
handle_worker_final (client_t *self)
{
mdp_msg_t *msg = self->message;
mdp_msg_t *client_msg = mdp_msg_new();
// Set routing id, messageid, service, body
zframe_t *address = mdp_msg_address(msg);
mdp_msg_set_routing_id(client_msg, address);
mdp_msg_set_id(client_msg, MDP_MSG_CLIENT_FINAL);
const char *service_name = self->service_name;
mdp_msg_set_service(client_msg, service_name);
zmsg_t *body = mdp_msg_get_body(msg);
mdp_msg_set_body(client_msg, &body);
mdp_msg_send(client_msg, self->server->router);
// Add the worker back to the list of waiting workers.
char *identity = zframe_strhex(mdp_msg_routing_id(msg));
worker_t *worker =
(worker_t *) zhash_lookup(self->server->workers, identity);
assert(worker);
zlist_append(self->server->waiting, worker);
service_t *service = (service_t *) zhash_lookup(self->server->services,
worker->service->name);
assert(service);
zlist_append(service->waiting, worker);
zstr_free(&identity);
mdp_msg_destroy(&client_msg);
s_service_dispatch(service);
}
修复已提交给 zmq/majordomo 团队。如果提交后,我将再次更新此 post。
约尔格