当调度策略为 SCHED_RR 时,pthread 临界区中运行时间峰值的原因可能是什么?

What may be the cause of the elapsed-time spikes in a pthread critical section when the scheduling policy is SCHED_RR?

我正在 Linux 中进行一些时间计算测试。我的内核是 Preempt-RT(但是 vanilla 内核在我的测试中给出了类似的结果...)

我有两个 pthread,运行同时在同一个处理器中(给定亲和力)。它们是实时线程(prio 99)。

我有一个受自旋锁保护的临界区,其中两个线程竞争锁。在关键部分内,我有一个增量操作,我尝试计算该操作的运行时间。

带有 __rdtsc 的代码示例:

pthread_spin_lock(&lock);

start_time = __rdtsc();
++cnt; //shared ram variable, type is unsigned long long
stop_time = __rdtsc();

pthread_spin_unlock(&lock);

带有计时器的代码示例:

pthread_spin_lock(&lock);

auto _start_time = std::chrono::high_resolution_clock::now();
++cnt; //shared ram variable, type is unsigned long long
auto _stop_time = std::chrono::high_resolution_clock::now();

pthread_spin_unlock(&lock);

线程 运行 循环几百万次然后终止。解锁自旋锁后,我记录了平均耗用时间和最大耗用时间。

现在,事情变得有趣了(至少对我而言):

测试 1:线程的调度策略为 SCHED_RR:

线程号:0,最大时间:34124,平均时间:28.114271,运行 Cnt:10000000

线程号:1,最长时间:339256976,平均时间:74.781960,运行 Cnt:10000000


测试 2:线程的调度策略为 SCHED_FIFO:

线程号:0,最大时间:33114,平均时间:48.414173,运行 Cnt:10000000

线程号:1,最大时间:38637,平均时间:24.327742,运行 Cnt:10000000


测试三:只有单线程,调度策略为SCHED_RR:

线程号:0,最大时间:34584,平均时间:54.165470,运行 Cnt:10000000


注意:主线程是非 rt 线程,它在单独的处理器中具有亲和力。这里不重要。

注意 1:所有测试给出大约。每次我 运行 他们都会得到类似的结果。

注2:给出的结果是rdtsc的输出。然而,计时计时器的结果与这些几乎相似。

所以我想我可能对scheduler有一个误解,所以我需要问这些问题:

  1. 在测试 1 中,巨大的最大时间峰值是如何出现的?测试 2 和 3 的表现并不像它...
  2. 为什么最大值和平均值的计算差距很大?是什么原因造成的,像定时器这样的中断?

我所有的测试代码是:

#include <stdio.h>
#include <stdlib.h>
#include "stdint.h"
#include <float.h>
#include <pthread.h>
#include <cxxabi.h>
#include <limits.h>
#include <sched.h>
#include <sys/mman.h>
#include <unistd.h> 
#include <sys/time.h> 
#include <sys/resource.h> 
#include <malloc.h>
#include <chrono>

/********* TEST CONFIG ************/

#define TEST_PTHREAD_RUN_CNT    10000000    //1000000000
#define NUM_OF_TEST_PTHREADS    2
#define MAIN_THREAD_CORE_INDEX  0
#define TEST_PTHREAD_PRIO       99
#define TEST_PTHREAD_POLICY     SCHED_RR

#define TIME_RDTSC              1
#define TIME_CHRONO             0
/**********************************/

/**********************************/
struct param_list_s
 {
    unsigned int thread_no;
 };
/**********************************/

/********* PROCESS RAM ************/
pthread_t threads[NUM_OF_TEST_PTHREADS];
struct param_list_s param_list[NUM_OF_TEST_PTHREADS];
unsigned long long max_time[NUM_OF_TEST_PTHREADS];
unsigned long long _max_time[NUM_OF_TEST_PTHREADS];
unsigned long long tot_time[NUM_OF_TEST_PTHREADS];
unsigned long long _tot_time[NUM_OF_TEST_PTHREADS];
unsigned long long run_cnt[NUM_OF_TEST_PTHREADS];
unsigned long long cnt;
pthread_spinlock_t lock;
/**********************************/

/*Proto*/
static void configureMemoryBehavior(void);
void create_rt_pthread(unsigned int thread_no);

/*
* Date............: 
* Function........: main
* Description.....: 
*/
int main(void)
{
    cpu_set_t  mask;
    int i;

    for (i = 0; i < NUM_OF_TEST_PTHREADS; ++i)
     {
        max_time[i] = 0;
        tot_time[i] = 0;
        run_cnt[i] = 0;

        _max_time[i] = 0;
        _tot_time[i] = 0;
     }
    cnt = 0;

    printf("\nSetting scheduler affinity for the process...");
    CPU_ZERO(&mask);
    CPU_SET(MAIN_THREAD_CORE_INDEX, &mask);
    sched_setaffinity(0, sizeof(mask), &mask);
    printf("done.\n");

    configureMemoryBehavior();

    pthread_spin_init(&lock, PTHREAD_PROCESS_PRIVATE);

    for (i = 0; i < NUM_OF_TEST_PTHREADS; ++i)
     {
        create_rt_pthread(i);
     }

    printf("Waiting threads to join\n...\n");
    for (i = 0; i < NUM_OF_TEST_PTHREADS; i++)
    {
        pthread_join(threads[i], NULL);
        #if(TIME_RDTSC == 1)
        printf("Thread no: %d, Max Time: %llu, Avg Time: %f, Run Cnt: %llu\n", i, max_time[i], (float)((float)tot_time[i] / run_cnt[i]), run_cnt[i]);
        #endif

        #if(TIME_CHRONO == 1)
        printf("Thread no: %d, Max Time: %lu, Avg Time: %f, Run Cnt: %lu\n", i, _max_time[i], (float)((float)_tot_time[i] / run_cnt[i]), run_cnt[i]);
        #endif
    }
    printf("All threads joined\n");
    printf("Shared Cnt: %llu\n", cnt);

    return 0;
}


/*
* Date............:
* Function........: thread_func
* Description.....:
*/
void *thread_func(void *argv)
{

    unsigned long long i, start_time, stop_time, latency = 0;
    unsigned int thread_no;

    thread_no = ((struct param_list_s *)argv)->thread_no;
    i = 0;
    while (1)
     {
        #if(TIME_RDTSC == 1)
        pthread_spin_lock(&lock);
        start_time = __rdtsc();
        ++cnt;
        stop_time = __rdtsc();
        pthread_spin_unlock(&lock);

        if (stop_time > start_time)
        {
            latency = stop_time - start_time;
            ++run_cnt[thread_no];

            tot_time[thread_no] += latency;
            if (latency > max_time[thread_no])
                max_time[thread_no] = latency;
        }
        #endif

        #if(TIME_CHRONO == 1)
        pthread_spin_lock(&lock);

        auto _start_time = std::chrono::high_resolution_clock::now();
        ++cnt;
        auto _stop_time = std::chrono::high_resolution_clock::now();

        pthread_spin_unlock(&lock);

        auto __start_time = std::chrono::duration_cast<std::chrono::nanoseconds>(_start_time.time_since_epoch()).count();
        auto __stop_time = std::chrono::duration_cast<std::chrono::nanoseconds>(_stop_time.time_since_epoch()).count();
        auto __latency = __stop_time - __start_time;

        if (__stop_time > __start_time)
        {
            _tot_time[thread_no] += __latency;
            ++run_cnt[thread_no];
            if (__latency > _max_time[thread_no])
            {
                _max_time[thread_no] = __latency;
            }
        }
        #endif

        if (++i >= TEST_PTHREAD_RUN_CNT)
            break;
     }

    return 0;
}


/*
* Date............:
* Function........: create_rt_pthread
* Description.....:
*/
void create_rt_pthread(unsigned int thread_no)
{

    struct sched_param  param;
    pthread_attr_t      attr;

    printf("Creating a new real-time thread\n");
    /* Initialize pthread attributes (default values) */
    pthread_attr_init(&attr);

    /* Set a specific stack size  */
    pthread_attr_setstacksize(&attr, PTHREAD_STACK_MIN);

    /* Set scheduler policy and priority of pthread */
    pthread_attr_setschedpolicy(&attr, TEST_PTHREAD_POLICY);
    param.sched_priority = TEST_PTHREAD_PRIO;
    pthread_attr_setschedparam(&attr, &param);

    /* Set the processor affinity*/
    cpu_set_t cpuset;
    CPU_ZERO(&cpuset);
    CPU_SET(1, &cpuset);

    pthread_attr_setaffinity_np(&attr, sizeof(cpu_set_t), &cpuset);

    /* Use scheduling parameters of attr */
    pthread_attr_setinheritsched(&attr, PTHREAD_EXPLICIT_SCHED);

    param_list[thread_no].thread_no = thread_no;

    if(pthread_create(&threads[thread_no], &attr, thread_func, (void *)&param_list[thread_no]) != 0)
     {
        printf("Thread could not be created.\n");
        exit(-1);
     }
}


/*
* Date............:
* Function........: configureMemoryBehavior
* Description.....:
*/
static void configureMemoryBehavior(void)
{
    printf("\nLocking memory...");
    /* Now lock all current and future pages
       from preventing of being paged */
    if (mlockall(MCL_CURRENT | MCL_FUTURE))
        perror("mlockall failed:");

    /* Turn off malloc trimming.*/
    mallopt(M_TRIM_THRESHOLD, -1);

    /* Turn off mmap usage. */
    mallopt(M_MMAP_MAX, 0);
    printf("done.\n");
}

当您 运行 和 SCHED_FIFO 时,您的一个线程开始 运行ning。然后它 运行s 直到它完成——因为这就是 SCHED_FIFO 的工作方式——没有任何东西会抢占它。因此,它在自旋锁中花费的时间是相对一致的。然后,在第一个线程完成后,第二个线程 运行s 完成而不争用锁。所以它也有一个更一致的时间。由于中断等原因,两者仍然存在一些抖动,但这两者之间相当一致。

当您 运行 与 SCHED_RR 时,您的一个线程 运行 会持续一段时间。在一个时间片的末尾,它被抢占,另一个将到达 运行——因为这就是 SCHED_RR 的工作方式。现在,它很有可能在持有自旋锁 时被抢占。所以,现在另一个线程正在 运行ning,它立即尝试获取自旋锁,但失败了——因为另一个线程持有锁。但它只是一直尝试直到时间片结束(因为自旋锁就是这样工作的——它永远不会阻塞等待获取锁)。当然,它在此期间什么也做不了。最终,时间片结束,持有锁的线程再次到达 运行。但是归因于该单个增量操作的时间现在包括等待另一个线程在其整个时间片中旋转的所有时间。

我认为如果您增加最大计数 (TEST_PTHREAD_RUN_CNT),您会发现 SCHED_RR 行为随着线程的 both 变得均匀最终会受到这种影响。现在,我猜一个线程很有可能在一两个时间片内完成。

如果你想在同一处理器上锁定另一个线程 运行 一个同等优先级,你可能应该使用 pthread_mutex_t。在成功获取的情况下,这与自旋锁的作用几乎相同,但在无法获取锁时会阻塞。

但请注意:这样做的结果很可能会将 SCHED_RR 行为转变为 SCHED_FIFO 行为:大多数情况下,抢占将在一个线程持有锁时发生,因此另一个将到达 运行 执行一些指令,直到它尝试获取锁,然后它将阻塞并且第一个将再次到达 运行 一个完整的时间片。

总的来说,在一个处理器上尝试 运行 两个 RT 优先级线程真的很冒险,因为它们都应该 运行 很长一段时间。 RT 优先级在您将每个线程锁定到其自己的核心或需要立即调度 RT 线程但只会 运行 再次阻塞之前的短时间内最有效。