[Thread sync]让一个打印线程等待n个线程完成一个循环的执行,然后通知n个线程完成另一个循环,重复

[Thread sync]Make a print thread wait for n threads to finish a single cycle of execution, then signal the n threads to complete another cycle, repeat

如何让单个线程等待 n 个线程完成单个执行周期,然后重复 i 个周期。该解决方案需要使用信号量向单个线程发出所有 n 个线程都已完成执行周期的信号。然后,单个线程应向所有 n 个线程发出信号,表明它们可以继续执行另一次执行,...重复。

一些我无法解决的挑战:

:

//these would have been read in at run time
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    printf("Worker id: %d\n", num);
    int proceed = 5; // simulates "5" jobs in this workers queue
    while(proceed > 0){
        sem_wait(&execute);
        printf("Report from: %d\n", num);
        sem_post(&reports);
        proceed--;
    }
    control--;
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute);
        }
    }
}

void main(){
    // control/n would have been scanned and passed to threads, the global control var 
    would be set after reading in n
    sem_init(&execute,0,n);    //initialization of the first semaphore
    sem_init(&reports,0,0);    //initialization of the second semaphore

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
}

输出:

[rnitz]$ gcc -Wall -Werror -w -g example2.c -std=gnu99 -lpthread 
[rnitz]$ ./a.out
Worker id: 0
Worker id: 3
Report from: 3
Report from: 3
Worker id: 2
Worker id: 1
Report from: 3
Report from: 0
All reported
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 0
Report from: 1
Report from: 3
Report from: 2
All reported
Report from: 0
Report from: 0
Report from: 1
Report from: 2
All reported
Report from: 2
Report from: 1
Report from: 1
Report from: 2
All reported
[rnitz]$ 

非常明显的问题,一些工作人员正在吃掉信号量中所有可用的“循环”空间。

注意:本题基于模拟cpu,其中n是cpu个线程的数量,它们将在自己的独立作业队列上工作;而单个打印线程处理打印当前正在处理的给定 cpu 周期的作业。打印线程将:

  1. 等待所有 cpu 完成一个周期
  2. 在每个 cpu
  3. 上打印当前作业
  4. 向 n cpu 发送信号以完成另一个周期。

解决方案:sem_t* 在 运行 时间 malloc 的信号量数组。我在任何地方都找不到这样的例子,但最终尝试并让它工作:)

我保留这个问题是因为我在 SOF 上找不到任何类似的东西,可以理解地等待 n 个线程在每个周期同步可能并不常见

为此,您可以使用信号量数组而不是 incrementing/decrementing 单个信号量。至少你需要至少一个信号量数组,这个解决方案使用 2.

如果每个工作线程都有不同数量的作业要完成,则可以扩展此解决方案,您可以有一个全局 int 在线程完成时递减,然后在 for-loops 打印机线程。这样,如果 n 个线程中的 1 个由于工作较少而提前完成,则可以 sem_wait()/sem_post n-1 次。

sem_t* execute_arr;
sem_t* reports_arr;

//this both need to be the same value
int n = 4;
int control = 4;

void *worker(void* args){
    int num = (int)args;
    //printf("init arg: %d\n", num);
    int proceed = 5;
    while(proceed > 0){
        sem_wait(&execute_arr[num]);
        printf("Report from: %d\n", num);
        sem_post(&reports_arr[num]);
        proceed--;
    }
    control--;
    //free memory and/or destroy sems if needed
} 

void *print(){
    while(control > 0){
        for(int i = 0; i < n; i++) {
            sem_wait(&reports_arr[i]);
        }
        printf("All reported\n");
        for(int i = 0; i < n; i++) {
            sem_post(&execute_arr[i]);
        }
    }
}

void main(){
    execute_arr = (sem_t*)malloc(n * sizeof(sem_t));
    reports_arr = (sem_t*)malloc(n * sizeof(sem_t));
    for(int i = 0; i < n; i++) {
        sem_init(&execute_arr[i], 0, 1);
        sem_init(&reports_arr[i], 0, 0);
    }

    pthread_t printer;
    pthread_create(&printer,NULL, &print,NULL);

    pthread_t workers[n];
    for(int i = 0; i < n; i++) {
        int* num = malloc(sizeof(int));
        num = i;
        pthread_create(&(workers[i]),NULL, &worker, (void*)num);
    }
    for(int i = 0; i < n; i++) {
        pthread_join(workers[i],NULL);
    }

    pthread_join(printer,NULL);
    //free memory and/or destroy sems if needed
}

固定输出:

[rnitz]$ ./a.out
Report from: 1
Report from: 0
Report from: 2
Report from: 3
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 2
Report from: 0
All reported
Report from: 1
Report from: 3
Report from: 0
Report from: 2
All reported
[rnitz]$ 

与其使用系统信号量,不如使用互斥量和几个条件变量来解决此类问题。

我将 main 未完成的工作留给您作为练习来初始化代码示例中的全局互斥锁和条件变量。

pthread_mutex mutex;
pthread_cond_t cv_thread;
pthread_cond_t cv_main;

int job_number = 0;
int completions = 0;
int exit_condition = 0;
int n = 4; //number of worker threads

void *worker(void* args){
    int num = (int)args;
    int lastJobNumber = 0;
    int must_exit = 0;

    while (1) {    

        // wait for the main thread to indicate a new job is ready
        pthread_mutex_lock(&mutex);

            while ((lastJobNumber >= job_number) && !exit_condition) {
              pthread_cond_wait(&cv_thread, &mut);  // wait for job_number to change or for exit_conditon to be set
            }

            must_exit = exit_condition;

            if (!must_exit) {
                lastJobNumber = job_number;  // take on the new job!
            }

        pthread_mutex_unlock(&mutex);

        if (must_exit) {
            break;
        }

        printf("Report from: %d.  This thread is executing job %d\n", num, lastJobNumber);

        pthread_mutex_lock(&mutex);
            completions++;
        pthread_mutex_unlock(&mutex);
        pthread_cond_broadcast(&cv_main); // signal to main to wake up
    }
} 


void *print() {
   for (int i = 0; i < 5; i++) {
        pthread_mutex_lock(&mutex);
            job_number++;
            completions = 0;

            // signal to threads a new job is ready
            pthread_cond_broadcast(&cv_thread);

            // wait for all threads to indicate completion
            while (completions < n) {
                pthread_cond_wait(&cv_main, &mutex); 
            }

        pthread_mutex_unlock(&mutex);

        printf("All reported\n");
   }

    // signal all threads to exit
    pthread_mutex_lock(&mutex);
        exit_condition = 1;
    pthread_cond_broadcast(&cv_thread);
    pthread_mutex_unlock(&mutex);
}