在 zmq 中使用定时器

Using timer with zmq

我正在做一个必须使用 zmq_poll 的项目。但是我没有完全理解它的作用。

于是我也尝试实现了:

zmq_pollitem_t timer_open(void){

  zmq_pollitem_t items[1];


    if( items[0].socket  == nullptr ){
         printf("error socket %s: %s\n", zmq_strerror(zmq_errno()));
         return;        
    }
else{
    items[0].socket = gsock; 
} 


items[0].fd = -1;   
items[0].events = ZMQ_POLLIN;  


 // get a timer
items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
    printf("timerfd_create() failed: errno=%d\n", errno);
            items[0].socket  = nullptr;

            return;
    }

int rc = zmq_poll(items,1,-1);

if(rc == -1){
    printf("error poll %s: %s\n", zmq_strerror(zmq_errno()));
    return;
} 
else
     return items[0];
}

我对这个话题很陌生,我必须修改一个旧的现有项目并将功能替换为 zmq 中的一个。在其他网站上,我看到了他们在无限循环中使用两个项目和 zmq_poll 函数的例子。我已经阅读了文档,但仍然无法正确理解它是如何工作的。这些是我实现的另外两个功能。不知道这样实现是否正确:

   void timer_set(zmq_pollitem_t items[] , long msec, ipc_timer_mode_t mode ) {


    struct itimerspec t;

    ...

    timerfd_settime( items[0].fd , 0, &t, NULL );

}


void timer_close(zmq_pollitem_t items[]){

if( items[0].fd != -1 )
       close(items[0].fd);

items[0].socket = nullptr; 

}

我不确定是否需要 zmq_poll 功能,因为我正在使用计时器。

编辑:

void some_function_timer_example() {
   // We want to wait on two timers
   zmq_pollitem_t items[2] ;

   // Setup first timer
   ipc_timer_open_(&items[0]);
   ipc_timer_set_(&items[0], 1000, IPC_TIMER_ONE_SHOT);
   // Setup second timer
   ipc_timer_open_(&items[1]);
   ipc_timer_set_(&items[1], 1000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
        //ipc_timer_set_(&items[0], 1000, IPC_TIMER_REPEAT);
        //ipc_timer_set_(&items[1], 5000, IPC_TIMER_REPEAT);

      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */

        if (items [0].revents & ZMQ_POLLIN) {
            //  Process task
            std::cout << "revents: 1" << std::endl;
        }
        if (items [1].revents & ZMQ_POLLIN) {
            //  Process weather update

            std::cout << "revents: 2" << std::endl;

        }
   }
}

现在打印速度还是很快的,不等待。它仍然只是在开始等待。当 timer_set 在循环内时,它会正确等待,前提是等待时间与以下相同:ipc_timer_set(&items[1], 1000,...) and ipctimer_set(&items[0], 1000,...)

那么我该如何更改呢?或者这是正确的行为吗?

zmq_poll 的工作方式类似于 select,但它允许一些额外的东西。例如,您可以 select 在常规同步文件描述符和特殊异步套接字之间。

在你的情况下,你可以像你尝试的那样使用定时器 fd,但你需要做一些小的改变。

首先,您必须考虑如何调用这些计时器。我认为用例是你想创建多个计时器并等待它们。这通常是您当前代码中的函数,可能正在为计时器使用循环(使用 select() 或他们可能正在做的任何其他事情)。 应该是这样的:

void some_function() {
   // We want to wait on two timers
   zmq_pollitem items[2];

   // Setup first timer
   ipc_timer_open(&item[0]);
   ipc_timer_set(&item[0], 1000, IPC_TIMER_ONE_REPEAT);
   // Setup second timer
   ipc_timer_open(&item[1]);
   ipc_timer_set(&item[1], 5000, IPC_TIMER_ONE_SHOT);

   // Now wait for the timers in a loop
   while (1) {
      int rc = zmq_poll (items, 2, -1);
      assert (rc >= 0); /* Returned events will be stored in items[].revents */
   }
}

现在,您需要修复 ipc_timer_open。这将非常简单 - 只需创建定时器 fd。

// Takes a pointer to pre-allocated zmq_pollitem_t and returns 0 for success, -1 for error
int ipc_timer_open(zmq_pollitem_t *items){
    items[0].socket = NULL; 
    items[0].events = ZMQ_POLLIN;  
    // get a timer
    items[0].fd  = timerfd_create( CLOCK_REALTIME, 0 );
    if( items[0].fd  == -1 )
    {
        printf("timerfd_create() failed: errno=%d\n", errno);
        return -1; // error
    }
    return 0;
}

编辑:添加为对评论的回复,因为它很长: 从文档中: If both socket and fd are set in a single zmq_pollitem_t, the ØMQ socket referenced by socket shall take precedence and the value of fd shall be ignored.

所以如果你传递的是fd,你必须将socket设置为NULL。我什至不清楚 gsock 是从哪里来的。这在文档中吗?我没找到。

And when will it break out of the while(1) loop?

这是应用逻辑,你要根据自己的需要来编码。 zmq_poll 只是在每次计时器命中时保持 returning。在此示例中,每秒 zmq_poll return 秒,因为第一个计时器(重复)一直在触发。但是在 5 秒时,它也会 return 因为第二个计时器(这是一次射击)。何时退出循环由您决定。你想让它无限地发展吗?您是否需要检查不同的条件才能退出循环?你想做 100 次然后 return 吗?您可以在此代码之上编写任何您想要的逻辑。

And what kind of events are returned back

ZMQ_POLLIN 因为计时器 fds 的行为类似于可读文件描述符。