czmq 倍数 zactor 实例崩溃

czmq multiples zactor instances crash

我正在尝试使用 czmq 库中的 zhash 和 zactor 编写一个基本示例。我想要实现的主要想法是:

  1. 创建 1024 个 zactor 实例并在每个 actor 创建后发送 "START" 命令。
  2. 等待 actor 的响应以继续创建 zactors。
  3. 删除之前创建的所有演员。

我有点不明白为什么这段代码会出错。每次我到达演员编号 60 时,应用程序都会退出:

...
...
58 actor started!
START command received!
59 actor started!
START command received!
60 actor started!
> Assertion failed: (self), function zsock_set_sndtimeo, file
> src/zsock_option.c, line 1344.
> Abort trap: 6

编译:gcc -o demo demo.c -g -lczmq

代码如下:

#include <stdio.h>
#include <czmq.h>

typedef struct {
    zsock_t *pipe;              //  Actor command pipe
    zpoller_t *poller;          //  Socket poller
    int terminated;
} accountactor_t;

typedef struct{
    zactor_t *actor;
    int foo;
} account_t;

accountactor_t *
accountactor_new (zsock_t *pipe, void *args)
{
    accountactor_t *self = (accountactor_t *) zmalloc (sizeof (accountactor_t));
    assert (self);

    self->pipe = pipe;
    self->poller = zpoller_new (self->pipe, NULL);
        self->terminated = 0;
    return self;

}

static void
accountactor_recv_api (accountactor_t *self)
{
//  Get the whole message of the pipe in one go
    zmsg_t *request = zmsg_recv (self->pipe);
    if (!request){
        return;        //  Interrupted
    }

    char *command = zmsg_popstr (request);

    if (streq (command, "START")){
            zsys_debug("START command received!");
        zsock_signal (self->pipe, 0);
    }else
    if (streq (command, "STOP")){
            zsys_debug("STOP command received!");
            zsock_signal (self->pipe, 0);
    }else
    if (streq (command, "$TERM")){
            zsys_debug("$TERM command received!");
        //  The $TERM command is send by zactor_destroy() method
        self->terminated = 1;

    }else {
        zsys_error ("Invalid command '%s'", command);
        zsock_signal (self->pipe, -1);
    }

    zmsg_destroy(&request);
    if(command){
        free(command);
    }
}

void
actor_fcn (zsock_t *pipe, void *args)
{
    accountactor_t * self = accountactor_new (pipe, args);
    if (!self)
        return;          //  Interrupted

        int rc  = 0;
    //  Signal actor successfully initiated
    zsock_signal (self->pipe, 0);

    while (!self->terminated) {
       zsock_t *which = (zsock_t *) zpoller_wait (self->poller, -1);
       if (which == self->pipe){
                        accountactor_recv_api (self);
       }
    }

        if(zpoller_terminated(self->poller)){
            zsys_debug("Poller Interrupted!");
        }else
        if(zpoller_expired(self->poller)){
            zsys_debug("Poller Expired!");
        }

        //  Free object itself
        zpoller_destroy (&self->poller);
        zsock_destroy(&self->pipe);
        free(self);
        self = NULL;

}

void
s_account_free (void *argument)
{
    account_t *account = (account_t *) argument;
        zstr_send (account->actor, "$TERM");
    zactor_destroy(&account->actor);
    free(account);
    zsys_debug("Item removed!");
}

int main(){

    zhash_t *table = zhash_new();
    int i = 0;

    account_t *ptrs[1024];
    char key[10];

    for(i=0; i<1024; i++){

        ptrs[i] = (account_t *) zmalloc (sizeof (account_t));

        ptrs[i]->actor = zactor_new (actor_fcn, NULL);
        sprintf(&key[0],"%d",i);
        zhash_insert(table, key, (void *)ptrs[i]);
        zhash_freefn(table, key, s_account_free);

        zstr_send (ptrs[i]->actor, "START");
        zsock_wait (ptrs[i]->actor);
        zsys_debug("%d actor started!",i);

    }

    i = zhash_size(table);
    // Delete all
    while(i--){
        sprintf(&key[0],"%d",i);
        zhash_delete(table, key);
        free(ptrs[i]);
    }

    return 0;

}

有什么想法吗?我不明白为什么我会达到这个 60 的数量限制。

您使用的是什么操作系统?是OS/X吗?

当前的 actor 实现仍然使用 PAIR 套接字,并且这些套接字在内部使用实际的文件句柄来发送信号。每个演员都有两个 PAIR 套接字,每个都使用两个文件句柄,所以 60 个演员 = 240 个文件句柄。

在 OS/X 上,每个进程的默认限制为 256。您可以提出这个问题,请参阅:http://zeromq.org/docs:tuning-zeromq

在Linux上默认是1024,你可以提高这个。在 Windows 上,您需要重新编译 libzmq,并将 FD_SETSIZE 设置为 16K 或类似的值(这是 libzmq master 现在所做的,但旧版本的值较低)。

完全分开:

  • 构造时不需要任何握手,因为zactor_new()已经完成了,你会看到所有actor在初始化时都发送了一个信号。

  • 创建 1024 个 actor 可能过多,除非您实际上是在测试系统限制。 Actor 使用系统线程;为了获得最佳性能,您需要每个代码一个线程。为获得最佳设计,每个并发工作线程一个线程。