是什么导致了 ZeroMQ 延迟高的情况以及如何避免它们?

What causes cases with high ZeroMQ latency and how to avoid them?

我尝试使用 ZeroMQ 进行快速消息传递。消息需要在 1 [ms] 内送达。我做了一些测试(inproc,Linux 上的单个进程,没有 TCP),发现通常没有问题。延迟约为 10 - 100 [us],具体取决于发送消息的频率(为什么?)。然而,有时在 6 [ms] 之后收到消息,这是不可接受的。

某些消息延迟的原因可能是什么?

也许进程被抢占了?

还是因为使用了轮询(zmq_poll())?

我的测试结果示例:

avg lag =    28    [us]
max lag =  5221    [us]
std dev =    25.85 [us]
big lag =   180    x above 200 [us]

"big lag" 表示延迟超过 200 [us] 的情况数。在我的测试中,发送了 500 000 条消息,因此值 180 表示超过 200 [us] 的延迟记录在 180 / 500000 = 0,036% 中。这是一个非常低的数字,但我希望它为零。即使以平均延迟为代价。

测试源码如下:

#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>

#define SOCKETS_NUM 5
#define RUNS 100000

void *context;
int numbers[SOCKETS_NUM];
struct {
    struct timespec send_time;
    struct timespec receive_time;
} times[SOCKETS_NUM * RUNS], *ptimes;

static void * worker_thread(void * dummy) {
    int * number = dummy;
    char endpoint[] = "inproc://endpointX";
    endpoint[17] = (char)('0' + *number);
    void * socket = zmq_socket(context, ZMQ_PUSH);
    zmq_connect(socket, endpoint);
    struct timespec sleeptime, remtime;
    int rnd = rand() / 3000;
    sleeptime.tv_sec = 0;
    sleeptime.tv_nsec = rnd;
    nanosleep(&sleeptime, &remtime);
    clock_gettime(CLOCK_REALTIME, &(ptimes[*number].send_time));
    zmq_send(socket, "Hello", 5, 0);
    zmq_close(socket);
    return NULL;
}

static void run_test(zmq_pollitem_t items[]) {
    pthread_t threads[SOCKETS_NUM];
    for (int i = 0; i < SOCKETS_NUM; i++) {
        pthread_create(&threads[i], NULL, worker_thread, &numbers[i]);
    }

    char buffer[10];
    int to_receive = SOCKETS_NUM;
    for (int i = 0; i < SOCKETS_NUM; i++) {
        int rc = zmq_poll(items, SOCKETS_NUM, -1);
        for (int j = 0; j < SOCKETS_NUM; j++) {
            if (items[j].revents & ZMQ_POLLIN) {
                clock_gettime(CLOCK_REALTIME, &(ptimes[j].receive_time));
                zmq_recv(items[j].socket, buffer, 10, 0);
            }
        }
        to_receive -= rc;
        if (to_receive == 0) break;
    }

    for (int i = 0; i < SOCKETS_NUM; i++) {
        pthread_join(threads[i], NULL);
    }
}

int main(void)
{
    context = zmq_ctx_new();
    zmq_ctx_set(context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO);
    zmq_ctx_set(context, ZMQ_THREAD_PRIORITY, 99);
    void * responders[SOCKETS_NUM];
    char endpoint[] = "inproc://endpointX";
    for (int i = 0; i < SOCKETS_NUM; i++) {
        responders[i] = zmq_socket(context, ZMQ_PULL);
        endpoint[17] = (char)('0' + i);
        zmq_bind(responders[i], endpoint);
        numbers[i] = i;
    }

    time_t tt;
    time_t t = time(&tt);
    srand((unsigned int)t);

    zmq_pollitem_t poll_items[SOCKETS_NUM];
    for (int i = 0; i < SOCKETS_NUM; i++) {
        poll_items[i].socket = responders[i];
        poll_items[i].events = ZMQ_POLLIN;
    }

    ptimes = times;
    for (int i = 0; i < RUNS; i++) {
        run_test(poll_items);
        ptimes += SOCKETS_NUM;
    }

    long int lags[SOCKETS_NUM * RUNS];
    long int total_lag = 0;
    long int max_lag = 0;
    long int big_lag = 0;
    for (int i = 0; i < SOCKETS_NUM * RUNS; i++) {
        lags[i] = (times[i].receive_time.tv_nsec - times[i].send_time.tv_nsec + (times[i].receive_time.tv_sec - times[i].send_time.tv_sec) * 1000000000) / 1000;
        if (lags[i] > max_lag) max_lag = lags[i];
        total_lag += lags[i];
        if (lags[i] > 200) big_lag++;
    }
    long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
    double SD = 0.0;
    for (int i = 0; i < SOCKETS_NUM * RUNS; ++i) {
        SD += pow((double)(lags[i] - avg_lag), 2);
    }
    double std_lag = sqrt(SD / SOCKETS_NUM / RUNS);
    printf("avg lag = %l5d    [us]\n", avg_lag);
    printf("max lag = %l5d    [us]\n", max_lag);
    printf("std dev = %8.2f [us]\n", std_lag);
    printf("big lag = %l5d    x above 200 [us]\n", big_lag);

    for (int i = 0; i < SOCKETS_NUM; i++) {
        zmq_close(responders[i]);
    }
    zmq_ctx_destroy(context);
    return 0;
}

Q : "...I'd like it to be zero."

说的好,做的难。

作为您 运行 超快内存映射 inproc:// 传输 Class,主要关注点是性能调整Context()-处理。在这里,您花费了如此多的设置开销和直接终止开销操作来发送 1E5 只是一个 5 [B],所以我想永远不会有与队列管理相关的问题,因为根本不会有任何“堆栈增长”。

1 )(假设我们让代码保持原样)至少设置 [=93= 的 ZeroMQ 映射是性能调整的自然步骤] ZMQ_AFFINITY(不从一个核心跳到另一个核心)。有趣的是,如果 PUSH-er 上有那么多 ~ 5E5 插座 setups/terminations一方,每个人都没有通过内存映射线发送超过一次 5 [B] ,可以通过配置 context[ 获得一些帮助(对于那些大的开销和维护) =74=]-具有 SOCKETS_NUM I/O-threads 的实例,使用 ZMQ_IO_THREADS 设置(争取“实时”-ness,使用 SCHED_FIFO,只有一个 I/O-thread 没有多大帮助,是吗?)

2 ) 下一级实验是重新平衡 ZMQ_THREAD_AFFINITY_CPU_ADD 地图(全局 context 的 I/O-threads 到 CPU-cores) 并且 ZMQ_AFFINITY 的每个插槽设置映射到 context 的 I/O-thread(s)。拥有足够数量的 CPU-cores,让几个 I/O-threads 服务于一个 socket-instance 的集群“在一起”可能会带来一些性能/超低延迟的好处,在一个相同的 CPU 核心,但在这里我们进入了一个领域,实际硬件和真实系统的后台工作负载以及仍然“备用”资源用于这个“实时” - 雄心勃勃的实验开始变得难以实现无需任何体内测试和验证即可预测。

3 ) 调整每个套接字 zmq_setsockopt() 参数可能会有所帮助,但除非纳米级套接字寿命(相当昂贵的一次性使用“消耗品- disposable" ), 不要指望从这里取得任何突破。

4 ) 尝试以纳秒分辨率测量,如果用于某物的“持续时间”,CLOCK_MONOTONIC_RAW 应该使用的越多,避免 ntp-注入调整、天文校正闰秒注入等

5 ) zmq_poll()-策略:我不会走这条路。使用 timeout == -1 会阻塞整个马戏团。在任何分布式计算系统中,我都强烈反对这种做法,尤其是在具有“实时”野心的分布式计算系统中。将 PULL 侧旋转到最大性能可以通过在任一侧有一个 1:1 PUSH/PULL 线程,或者如果试图挑战修饰,有 5-PUSH-呃线程,正如你所拥有的那样,并在一个单一的零拷贝上收集所有入口消息PULL-er(更容易轮询,可以使用基于有效负载的索引助手,发送方时间戳到放置接收方时间戳),无论如何,阻塞轮询器几乎是挑战任何低延迟软实时玩具的反模式。

无论如何,不​​要犹豫重构代码并使用分析工具来更好地了解您在哪里“获得”big_lag-s(我的猜测是以上 )

#include <stdlib.h>
#include <math.h>
#include <zmq.h>
#include <pthread.h>

#define SOCKETS_NUM      5
#define        RUNS 100000

void *context;
int   numbers[SOCKETS_NUM];
struct {
    struct timespec send_time;
    struct timespec recv_time;
} times[SOCKETS_NUM * RUNS],
 *ptimes;

static void *worker_thread( void *dummy ) { //-------------------------- an ovehead expensive one-shot PUSH-based "Hello"-sender & .close()
    
    int   *number       = dummy;
    char   endpoint[]   = "inproc://endpointX";
           endpoint[17] = (char)( '0' + *number );
    int    rnd          = rand() / 3000;
    void  *socket       = zmq_socket( context, ZMQ_PUSH );
            
    struct timespec   remtime,
                    sleeptime;
                    sleeptime.tv_sec  = 0;
                    sleeptime.tv_nsec = rnd;
                    
    zmq_connect( socket, endpoint );
    
    nanosleep( &sleeptime, &remtime ); // anything betweed < 0 : RAND_MAX/3000 > [ns] ... easily >> 32, as #define RAND_MAX    2147483647 ~ 715 827 [ns]
    
    clock_gettime( CLOCK_REALTIME, &( ptimes[*number].send_time) ); //............................................................................ CLK_set_NEAR_SEND
                                                                    // any CLOCK re-adjustments may and will skew any non-MONOTONIC_CLOCK
    
    zmq_send(  socket, "Hello", 5, 0 );
    zmq_close( socket );
    
    return NULL;
}

static void run_test( zmq_pollitem_t items[] ) { //--------------------- zmq_poll()-blocked zmq_recv()-orchestrator ( called ~ 1E5 x !!! resources' nano-use & setup + termination overheads matter )
    
    char      buffer[10];
    int       to_receive = SOCKETS_NUM;
    pthread_t threads[SOCKETS_NUM];
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-maker ( a per-socket PUSH-er[]-s )
        pthread_create( &threads[i], NULL, worker_thread, &numbers[i] );
    }
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ [SERIAL]-------- [i]-stepping
        
        int rc = zmq_poll( items, SOCKETS_NUM, -1 ); //----------------- INFINITE ??? --- blocks /\/\/\/\/\/\/\/\/\/\/\ --- several may flag ZMQ_POLLIN
        
        for ( int j = 0; j < SOCKETS_NUM; j++ ) { //-------------------- ALL-CHECKED in a loop for an items[j].revents
            
            if ( items[j].revents & ZMQ_POLLIN ) { //------------------- FIND IF IT WAS THIS ONE
                
                clock_gettime( CLOCK_REALTIME, &( ptimes[j].recv_time ) );//...................................................................... CLK_set_NEAR_poll()_POSACK'd R2recv
                
                zmq_recv( items[j].socket, buffer, 10, 0 ); //---------- READ-IN from any POSACK'd by zmq_poll()-er flag(s)
            }
        }
        to_receive -= rc; // ---------------------------------------------------------------------------------------------- SUB rc
        if (to_receive == 0) break;
    }

    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ thread-killer
        
        pthread_join( threads[i], NULL );
    }
}

int main( void ) {
    
                 context = zmq_ctx_new();
    zmq_ctx_set( context, ZMQ_THREAD_SCHED_POLICY, SCHED_FIFO );
    zmq_ctx_set( context, ZMQ_THREAD_PRIORITY, 99 );
    
    void *responders[SOCKETS_NUM];
    char  endpoint[] = "inproc://endpointX";
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) {
        
        responders[i] = zmq_socket( context, ZMQ_PULL ); // ------------ PULL instances into []
        endpoint[17] = (char)( '0' + i );
        zmq_bind( responders[i], endpoint ); //------------------------- .bind()
        numbers[i] = i;
    }

    time_t tt;
    time_t t = time(&tt);
    srand( (unsigned int)t );

    zmq_pollitem_t poll_items[SOCKETS_NUM];
    
    for ( int i = 0; i < SOCKETS_NUM; i++ ) { //------------------------ zmq_politem_t array[] ---pre-fill---
        poll_items[i].socket = responders[i];
        poll_items[i].events = ZMQ_POLLIN;
    }

    ptimes = times;
    
    for ( int i = 0; i < RUNS; i++ ) { //------------------------------- 1E5 RUNs
        run_test( poll_items ); // -------------------------------------     RUN TEST
        ptimes += SOCKETS_NUM;
    }

    long int lags[SOCKETS_NUM * RUNS];
    long int total_lag = 0;
    long int   max_lag = 0;
    long int   big_lag = 0;
    
    for ( int i = 0; i < SOCKETS_NUM * RUNS; i++ ) {
        lags[i] = (   times[i].recv_time.tv_nsec
                  -   times[i].send_time.tv_nsec
                  + ( times[i].recv_time.tv_sec
                    - times[i].send_time.tv_sec
                      ) * 1000000000
                    ) / 1000; // --------------------------------------- [us]
        if ( lags[i] > max_lag ) max_lag = lags[i];
        total_lag += lags[i];
        if ( lags[i] > 200 )     big_lag++;
    }
    
    long int avg_lag = total_lag / SOCKETS_NUM / RUNS;
    double        SD = 0.0;
    
    for ( int i = 0; i < SOCKETS_NUM * RUNS; ++i ) {
        SD += pow( (double)( lags[i] - avg_lag ), 2 );
    }
    
    double std_lag = sqrt( SD / SOCKETS_NUM / RUNS );
    
    printf("avg lag = %l5d    [us]\n", avg_lag);
    printf("max lag = %l5d    [us]\n", max_lag);
    printf("std dev = %8.2f [us]\n", std_lag);
    printf("big lag = %l5d    x above 200 [us]\n", big_lag);

    for ( int i = 0; i < SOCKETS_NUM; i++ ) {
        zmq_close( responders[i] );
    }
    zmq_ctx_destroy( context );
    
    return 0;
}

使用 nanosleep 随机(不是基数,安全地在任何控制循环之外 activity)睡眠是一种冒险的奢侈,因为在早期的内核中会导致问题:

In order to support applications requiring much more precise pauses (e.g., in order to control some time-critical hardware), nanosleep() would handle pauses of up to 2 ms by busy waiting with microsecond precision when called from a thread scheduled under a real-time policy like SCHED_FIFO or SCHED_RR. This special extension was removed in kernel 2.5.39, hence is still present in current 2.4 kernels, but not in 2.6 kernels.