在 C 上使用 pthreads 实现屏障
Implement barrier with pthreads on C
我正在尝试并行化合并排序算法。我正在做的是为每个线程划分输入数组,然后合并线程结果。我尝试合并结果的方式是这样的:
thread 0 | thread 1 | thread 2 | thread 3
sort(A0) | sort(A1) | sort(A2) | sort(A3)
merge(A0,A1) | | merge(A2,A3) |
merge(A0A1, A2A3) | | |
因此,在函数 sortManager
的末尾,我调用了应该实现上述逻辑的函数 mergeThreadResults
。在其中,我遍历对以合并相应的线程。然后,如果需要,我将最后的项目合并到线程 0 中。它看起来像这样:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
它似乎按预期工作。问题是,我正在使用 sleep
函数来同步线程,这远不是最好的方法。所以我正在尝试使用 pthread 实现屏障。
在其中,我尝试计算该循环需要多少次迭代并将其作为 breakpoint
传递。当所有线程都处于同一点时,我释放合并功能并在新循环中再次等待。这是我试过的:
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
但它没有按预期工作。一些 merge
在最后一个周期完全结束之前触发,给我留下了一个部分排序的数组。
这是我的测试代码的一个小例子:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left + 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 + 2) * sizeof(int));
int *R = malloc((n2 + 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i++) {
L[i] = arr[left + i - 1];
}
for (j = 1; j <= n2; j++) {
R[j] = arr[mid + j];
}
L[n1 + 1] = INT_MAX;
R[n2 + 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k++) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i++;
} else {
arr[k] = R[j];
j++;
}
}
free(L);
free(R);
}
void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left + (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid + 1, right);
merge(arr, left, mid, right);
}
}
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left + (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid + 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0) {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}
struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread++){
threadTask = &tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}
void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
printf("%d ", arr[arrayIndex]);
printf("\n");
}
int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;
// Display input
printf("\nInput array:\n");
printArray(sortingArray, arraySize);
// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread++) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);
// Display output
printf("\nSorted array:\n");
printArray(sortingArray, arraySize);
return 0;
}
I'm trying to parallelize a merge-sort algorithm. What I'm doing is
dividing the input array for each thread, then merging the threads
results.
好的,但是您的方法是不必要的困难。在合并过程的每一步,您希望一半线程等待另一半线程完成完成,一个线程等待另一个线程完成的最自然方式是使用pthread_join()
。如果您希望所有线程在同步后继续进行更多工作,那将是不同的,但在这种情况下,那些不负责更多合并的线程将无事可做。
This is what I've tried:
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
这有几个问题,但最大的问题是障碍是工作的错误工具。在屏障达到顶峰后,所有被阻塞的线程都会继续运行。您希望 half 线程继续执行合并,但其他线程(应该)没有更多工作要做。您对 breakpoint
的计算假设下半场不会 return 到达障碍,这确实是他们不应该做的。如果你坚持使用屏障,那么没有合并执行的线程应该在通过屏障后终止。
此外,从 iter
2 开始是不正确的。如果您使用屏障方法,则 所有 每次迭代中的活动线程必须在任何线程之前到达屏障继续,但如果 iter
从 2 开始,那么在第一次迭代时,只有一半的线程必须在障碍通过之前到达障碍。
此外,您的 CV 使用不符合习惯并且容易出现问题。 pthread_cond_wait()
的已记录失败原因中的 None 可以通过尝试再次等待来挽救,因此您可能需要在出错时终止程序。另请注意,pthread_mutex_lock()
、pthread_mutex_unlock()
和 pthread_cond_broadcast()
都可能会失败。
另一方面,CV 容易受到(非常罕见的)虚假唤醒的影响,因此在等待成功 return 后,您需要在继续之前再次检查条件,并可能再次等待。更像是这样的:
if (pthread_mutex_lock(&mutex) != 0) {
perror("pthread_mutex_lock");
abort();
}
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
if (pthread_cond_broadcast(&cond_var) != 0) {
perror("pthread_cond_broadcast");
abort();
}
} else {
do {
if (pthread_cond_wait(&cond_var, &mutex) != 0) {
perror("pthread_cond_wait");
abort();
}
} while (counter < breakpoint);
}
if (pthread_mutex_unlock(&mutex) != 0) {
perror("pthread_mutex_unlock");
abort();
}
// some threads must terminate at this point
正如@John Bollinger 所说,您的方法看起来不必要地困难,解决方案也同样复杂。但是如果你想实现一个屏障,我建议你把它放在 mergeThreadResults
中的 merge
之后。这样,您可以等待所有在该周期内工作的线程完成,然后再转到下一个周期。
要创建它,您需要在每次迭代中通过一个新的障碍。因为在每个循环中,执行合并的线程数都会减少。所以开始宣布一些全球障碍:
int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;
该标志用于为每次迭代创建一个屏障,我们将需要为每个循环使用多个 mergeBarrier。不要忘记在 main
函数中初始化它,迭代次数为:mergeBarrier = realloc(mergeBarrier, howManyIterations);
然后我们可以像这样创建一个屏障:
pthread_mutex_lock(&mutex);
if (mergeCycleFlag != iter) {
mergeCycleFlag = iter;
int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
}
pthread_mutex_unlock(&mutex);
... MERGE ...
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[iter]);
请注意,我使用 lock
来创建屏障,因为我们不希望两个线程同时在此处乱来。如果没有为此 iter
设置障碍,我们将创建一个具有现在应该工作的线程数的障碍。此外,我已经更改了您的 breakpoint
语句以适合计算我们期望执行 merge
.
的线程数
经过一些调整后,您的 mergeThreadResults
应该是这样的:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread, nextThreadRight;
int groupSize = 2;
while (groupSize <= threads) {
if (myRank % groupSize != 0) { // Release threads that no long perform merges
break;
}
nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread-1);
// Init barrier with number of threads you will wait merging
pthread_mutex_lock(&mutex); // Just one thread can set the barrier
if (mergeCycleFlag != groupSize) {
mergeCycleFlag = groupSize;
int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop); // set barrier
}
pthread_mutex_unlock(&mutex);
// Merge thread group with neighbour group
merge(sortingArray, myLeft, myRight, nextThreadRight);
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[groupSize]);
myRight = nextThreadRight;
groupSize = groupSize * 2;
}
// Merge thread 0
if (myRank == 0 && nextThread < threads-1) {
nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
最后,为了让这个解决方案起作用,您需要每个线程在合并结果之前完成它们的工作。因此,您需要在 main
中的 join
之后调用它,或者在 sortManager
上调用 mergeThreadResults
之前对所有线程实施另一个屏障。
此外,更好的方法是线程仅等待它们将合并的其他线程。比如,线程 0 只等待 1。然后等待 2...等等
我正在尝试并行化合并排序算法。我正在做的是为每个线程划分输入数组,然后合并线程结果。我尝试合并结果的方式是这样的:
thread 0 | thread 1 | thread 2 | thread 3
sort(A0) | sort(A1) | sort(A2) | sort(A3)
merge(A0,A1) | | merge(A2,A3) |
merge(A0A1, A2A3) | | |
因此,在函数 sortManager
的末尾,我调用了应该实现上述逻辑的函数 mergeThreadResults
。在其中,我遍历对以合并相应的线程。然后,如果需要,我将最后的项目合并到线程 0 中。它看起来像这样:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(3); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
它似乎按预期工作。问题是,我正在使用 sleep
函数来同步线程,这远不是最好的方法。所以我正在尝试使用 pthread 实现屏障。
在其中,我尝试计算该循环需要多少次迭代并将其作为 breakpoint
传递。当所有线程都处于同一点时,我释放合并功能并在新循环中再次等待。这是我试过的:
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
但它没有按预期工作。一些 merge
在最后一个周期完全结束之前触发,给我留下了一个部分排序的数组。
这是我的测试代码的一个小例子:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <limits.h>
#include <string.h>
#include <time.h>
#include <pthread.h>
#include <unistd.h>
// Initialize global variables
int sortingArray[20] = {5,-4,3,-1,-2,3,1,2,-2,-1,-2,-1,-2,-3,4,1234,534,123,87,123};
int counter = 0;
pthread_mutex_t mutex;
pthread_cond_t cond_var;
struct ThreadTask {
long rank;
int size;
int threads;
};
void merge(int arr[], int left, int mid, int right) {
/* Merge arrays */
int i, j, k;
int n1 = mid - left + 1;
int n2 = right - mid;
// Alocate temp arrays
int *L = malloc((n1 + 2) * sizeof(int));
int *R = malloc((n2 + 2) * sizeof(int));
if (L == NULL || R == NULL) {
fprintf(stderr, "Fatal: failed to allocate memory fo temp arrays.");
exit(EXIT_FAILURE);
}
// Populate temp arrays
for (i = 1; i <= n1; i++) {
L[i] = arr[left + i - 1];
}
for (j = 1; j <= n2; j++) {
R[j] = arr[mid + j];
}
L[n1 + 1] = INT_MAX;
R[n2 + 1] = INT_MAX;
i = 1;
j = 1;
// Merge arrays
for (k = left; k <= right; k++) {
if (L[i] <= R[j]) {
arr[k] = L[i];
i++;
} else {
arr[k] = R[j];
j++;
}
}
free(L);
free(R);
}
void mergeSort(int arr[], int left, int right) {
/* Sort and then merge arrays */
if (left < right) {
int mid = left + (right - left) / 2;
mergeSort(arr, left, mid);
mergeSort(arr, mid + 1, right);
merge(arr, left, mid, right);
}
}
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread;
int iter = 2;
while (iter <= threads) {
int nextThread = (myRank+1*iter) < threads ? (myRank+1*iter) : threads;
int nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread);
if (myRank % iter != 0) {
break;
}
// barrier
pthread_mutex_lock(&mutex);
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
pthread_cond_broadcast(&cond_var);
} else {
while (pthread_cond_wait(&cond_var, &mutex) != 0);
}
pthread_mutex_unlock(&mutex);
merge(sortingArray, myLeft, myRight, nextThreadRight);
sleep(2); // <- sleep
myRight = nextThreadRight;
iter = iter * 2;
}
if (myRank == 0 && nextThread < threads-1) {
int nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
void *sortManager(void *threadInfo) {
/* Manage mergeSort between threads */
struct ThreadTask *currentTask = threadInfo;
// Get task arguments
long rank = currentTask->rank;
int left= rank * ((float)currentTask->size / (float)currentTask->threads);
int right = (rank + 1) * ((float)currentTask->size / (float)currentTask->threads) - 1;
int mid = left + (right - left) / 2;
// Execute merge for task division
if (left < right) {
mergeSort(sortingArray, left, mid);
mergeSort(sortingArray, mid + 1, right);
merge(sortingArray, left, mid, right);
}
// Merge thread results
if (rank % 2 == 0) {
mergeThreadResults(rank, left, right, currentTask->size, currentTask->threads);
}
return 0;
}
struct ThreadTask *threadCreator(int size, int threads, pthread_t *thread_handles, struct ThreadTask *tasksHolder) {
/* Create threads with each task info */
struct ThreadTask *threadTask;
for (long thread = 0; thread < threads; thread++){
threadTask = &tasksHolder[thread];
threadTask->rank = thread;
threadTask->size = size;
threadTask->threads = threads;
pthread_create(&thread_handles[thread], NULL, sortManager, (void*) threadTask);
}
return tasksHolder;
}
void printArray(int arr[], int size) {
/* Print array */
for (int arrayIndex = 0; arrayIndex < size; arrayIndex++)
printf("%d ", arr[arrayIndex]);
printf("\n");
}
int main(int argc, char *argv[]) {
// Initialize arguments
int arraySize = 20;
int totalThreads = 16;
// Display input
printf("\nInput array:\n");
printArray(sortingArray, arraySize);
// Initialize threads
pthread_t *thread_handles;
thread_handles = malloc(totalThreads * sizeof(pthread_t));
// Create threads
struct ThreadTask threadTasksHolder[totalThreads];
*threadTasksHolder = *threadCreator(arraySize, totalThreads, thread_handles, threadTasksHolder);
// Execute merge sort in each thread
for (long thread = 0; thread < totalThreads; thread++) {
pthread_join(thread_handles[thread], NULL);
}
free(thread_handles);
// Display output
printf("\nSorted array:\n");
printArray(sortingArray, arraySize);
return 0;
}
I'm trying to parallelize a merge-sort algorithm. What I'm doing is dividing the input array for each thread, then merging the threads results.
好的,但是您的方法是不必要的困难。在合并过程的每一步,您希望一半线程等待另一半线程完成完成,一个线程等待另一个线程完成的最自然方式是使用pthread_join()
。如果您希望所有线程在同步后继续进行更多工作,那将是不同的,但在这种情况下,那些不负责更多合并的线程将无事可做。
This is what I've tried:
pthread_mutex_lock(&mutex); counter++; int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1; if(counter >= breakpoint ) { counter = 0; pthread_cond_broadcast(&cond_var); } else { while (pthread_cond_wait(&cond_var, &mutex) != 0); } pthread_mutex_unlock(&mutex);
这有几个问题,但最大的问题是障碍是工作的错误工具。在屏障达到顶峰后,所有被阻塞的线程都会继续运行。您希望 half 线程继续执行合并,但其他线程(应该)没有更多工作要做。您对 breakpoint
的计算假设下半场不会 return 到达障碍,这确实是他们不应该做的。如果你坚持使用屏障,那么没有合并执行的线程应该在通过屏障后终止。
此外,从 iter
2 开始是不正确的。如果您使用屏障方法,则 所有 每次迭代中的活动线程必须在任何线程之前到达屏障继续,但如果 iter
从 2 开始,那么在第一次迭代时,只有一半的线程必须在障碍通过之前到达障碍。
此外,您的 CV 使用不符合习惯并且容易出现问题。 pthread_cond_wait()
的已记录失败原因中的 None 可以通过尝试再次等待来挽救,因此您可能需要在出错时终止程序。另请注意,pthread_mutex_lock()
、pthread_mutex_unlock()
和 pthread_cond_broadcast()
都可能会失败。
另一方面,CV 容易受到(非常罕见的)虚假唤醒的影响,因此在等待成功 return 后,您需要在继续之前再次检查条件,并可能再次等待。更像是这样的:
if (pthread_mutex_lock(&mutex) != 0) {
perror("pthread_mutex_lock");
abort();
}
counter++;
int breakpoint = threads % 2 == 0 ? threads/iter : threads/iter+1;
if(counter >= breakpoint ) {
counter = 0;
if (pthread_cond_broadcast(&cond_var) != 0) {
perror("pthread_cond_broadcast");
abort();
}
} else {
do {
if (pthread_cond_wait(&cond_var, &mutex) != 0) {
perror("pthread_cond_wait");
abort();
}
} while (counter < breakpoint);
}
if (pthread_mutex_unlock(&mutex) != 0) {
perror("pthread_mutex_unlock");
abort();
}
// some threads must terminate at this point
正如@John Bollinger 所说,您的方法看起来不必要地困难,解决方案也同样复杂。但是如果你想实现一个屏障,我建议你把它放在 mergeThreadResults
中的 merge
之后。这样,您可以等待所有在该周期内工作的线程完成,然后再转到下一个周期。
要创建它,您需要在每次迭代中通过一个新的障碍。因为在每个循环中,执行合并的线程数都会减少。所以开始宣布一些全球障碍:
int mergeCycleFlag = 0;
pthread_mutex_t mutex;
pthread_barrier_t *mergeBarrier;
该标志用于为每次迭代创建一个屏障,我们将需要为每个循环使用多个 mergeBarrier。不要忘记在 main
函数中初始化它,迭代次数为:mergeBarrier = realloc(mergeBarrier, howManyIterations);
然后我们可以像这样创建一个屏障:
pthread_mutex_lock(&mutex);
if (mergeCycleFlag != iter) {
mergeCycleFlag = iter;
int mergesInLoop = threads % iter== 0 ? threads/iter: threads/iter+1;
pthread_barrier_init(&mergeBarrier[iter], NULL, mergesInLoop);
}
pthread_mutex_unlock(&mutex);
... MERGE ...
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[iter]);
请注意,我使用 lock
来创建屏障,因为我们不希望两个线程同时在此处乱来。如果没有为此 iter
设置障碍,我们将创建一个具有现在应该工作的线程数的障碍。此外,我已经更改了您的 breakpoint
语句以适合计算我们期望执行 merge
.
经过一些调整后,您的 mergeThreadResults
应该是这样的:
void mergeThreadResults(long myRank, int myLeft, int myRight, int size, int threads) {
int nextThread, nextThreadRight;
int groupSize = 2;
while (groupSize <= threads) {
if (myRank % groupSize != 0) { // Release threads that no long perform merges
break;
}
nextThread = (myRank+1*groupSize) < threads ? (myRank+1*groupSize) : threads;
nextThreadRight = nextThread * ((float)size / (float)threads) - 1;
printf("Merging threads %ld to %d\n", myRank, nextThread-1);
// Init barrier with number of threads you will wait merging
pthread_mutex_lock(&mutex); // Just one thread can set the barrier
if (mergeCycleFlag != groupSize) {
mergeCycleFlag = groupSize;
int mergesInLoop = threads % groupSize == 0 ? threads/groupSize : threads/groupSize+1; // Calculate threads working in this step
pthread_barrier_init(&mergeBarrier[groupSize], NULL, mergesInLoop); // set barrier
}
pthread_mutex_unlock(&mutex);
// Merge thread group with neighbour group
merge(sortingArray, myLeft, myRight, nextThreadRight);
// Wait everyone finnish merging
pthread_barrier_wait (&mergeBarrier[groupSize]);
myRight = nextThreadRight;
groupSize = groupSize * 2;
}
// Merge thread 0
if (myRank == 0 && nextThread < threads-1) {
nextThreadRight = threads * ((float)size / (float)threads) - 1;
merge(sortingArray, myLeft, myRight, nextThreadRight);
}
}
最后,为了让这个解决方案起作用,您需要每个线程在合并结果之前完成它们的工作。因此,您需要在 main
中的 join
之后调用它,或者在 sortManager
上调用 mergeThreadResults
之前对所有线程实施另一个屏障。
此外,更好的方法是线程仅等待它们将合并的其他线程。比如,线程 0 只等待 1。然后等待 2...等等