使用 POSIX 信号量的可重用屏障实现
Reusable barrier implementation using POSIX semaphores
需要一个创建 5 个 pthread 的解决方案。每个 pthread 执行一个函数,该函数涉及循环迭代 10 次。在循环的每次迭代中,线程将 int 从 0 递增到 0.9*MAX_INT,然后打印迭代编号。确保 5 个线程中的每一个都完成循环的第 i 次迭代,然后才能开始第 (i+1) 次迭代(即所有线程 synchronize/rendezvous 接近每次迭代的末尾)。我需要使用使用 POSIX 信号量实现的两相屏障来强制执行同步约束
我写了下面的代码对吗?
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int thread_count;
void* MyThread(void* rank);
int main()
{
long thread;
pthread_t* thread_handles;
thread_count = 5;
thread_handles = malloc (thread_count*sizeof(pthread_t));
for (thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread],NULL,MyThread,(void*) thread);
for (thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);
free(thread_handles);
return 0;
}
void* Hello(void* rank)
{
long my_rank = (long) rank;
int a,i;
a=0;
for(i=0;i<10;i++)
{
int n = 5;
int count = 0;
pthread_mutex_t mutex = Semaphore(1)
barrier = Semaphore(0)
a = a + 0.9*MAX_INT;
printf("this is %d iteration\n",i);
mutex.wait()
count = count + 1
mutex.signal()
if count == n: barrier.signal() # unblock ONE thread
barrier.wait()
barrier.signal()
}
}
编辑:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <semaphore.h>
typedef struct {
int n;
int count;
sem_t mutex;
sem_t turnstyle;
sem_t turnstyle2;
} barrier_t;
void init_barrier(barrier_t *barrier, int n)
{
barrier->n = n;
barrier->count = 0;
sem_init(&barrier->mutex, 0, 1);
sem_init(&barrier->turnstyle, 0, 0);
sem_init(&barrier->turnstyle2, 0, 0);
}
void phase1_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (++barrier->count == barrier->n) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstyle);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstyle);
}
void phase2_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (--barrier->count == 0) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstyle2);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstyle2);
}
void wait_barrier(barrier_t *barrier)
{
phase1_barrier(barrier);
phase2_barrier(barrier);
}
#define NUM_THREADS 5
void *myThread(void *);
int main(int argc, char **argv)
{
pthread_t threads[NUM_THREADS];
barrier_t barrier;
int i;
init_barrier(&barrier, NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, myThread, &barrier);
}
for (i = 0; i < NUM_THREADS, i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
void *myThread(void *arg)
{
barrier_t *barrier = arg;
int i,a;
for(i=0;i<10;i++)
{
a = a + 0.9*MAX_INT;
printf("this is %d iteration\n",i);
}
return NULL;
}
好的,如果我们检查第 3.7.7 节 "The Little Book of Semaphores" 中的 Barrier
对象,我们会发现我们需要一个 mutex
和 2 个名为 turnstile
的信号量和 turnstile2
(互斥锁可以是初始化为 1 的信号量)。
由于我们必须使用 POSIX 信号量、pthreads 和 INT_MAX
,因此我们首先包含必要的头文件:
#include <pthread.h>
#include <semaphore.h>
#include <limits.h>
这本书使 Barrier
成为一个对象;然而,在 C 中,我们并没有真正的对象,但我们可以创建一个 struct
并带有一些函数来对其进行操作:
typedef struct {
int n;
int count;
sem_t mutex;
sem_t turnstile;
sem_t turnstile2;
} barrier_t;
我们可以创建一个函数来初始化屏障:
void init_barrier(barrier_t *barrier, int n)
{
barrier->n = n;
barrier->count = 0;
sem_init(&barrier->mutex, 0, 1);
sem_init(&barrier->turnstile, 0, 0);
sem_init(&barrier->turnstile2, 0, 0);
}
并实现phase1
函数,如书中所述:
void phase1_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (++barrier->count == barrier->n) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstile);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstile);
}
请注意,sem_post
函数仅执行一次 post,因此需要循环 post turnstile
n
次。
phase2
函数也同样直接跟随:
void phase2_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (--barrier->count == 0) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstile2);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstile2);
}
终于可以实现wait
功能了:
void wait_barrier(barrier_t *barrier)
{
phase1_barrier(barrier);
phase2_barrier(barrier);
}
现在,在您的 main
函数中,您可以分配和初始化屏障并将其传递给您生成的线程:
#define NUM_THREADS 5
void *myThread(void *);
int main(int argc, char **argv)
{
pthread_t threads[NUM_THREADS];
barrier_t barrier;
int i;
init_barrier(&barrier, NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, myThread, &barrier);
}
for (i = 0; i < NUM_THREADS, i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
最后,实现线程:
void *myThread(void *arg)
{
barrier_t *barrier = arg;
int i;
int a;
for (i = 0; i < 10; i++) {
for (a = 0; a < 0.9*INT_MAX; a++);
printf("this is %d iteration\n", i);
wait_barrier(barrier);
}
return NULL;
}
需要一个创建 5 个 pthread 的解决方案。每个 pthread 执行一个函数,该函数涉及循环迭代 10 次。在循环的每次迭代中,线程将 int 从 0 递增到 0.9*MAX_INT,然后打印迭代编号。确保 5 个线程中的每一个都完成循环的第 i 次迭代,然后才能开始第 (i+1) 次迭代(即所有线程 synchronize/rendezvous 接近每次迭代的末尾)。我需要使用使用 POSIX 信号量实现的两相屏障来强制执行同步约束
我写了下面的代码对吗?
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
int thread_count;
void* MyThread(void* rank);
int main()
{
long thread;
pthread_t* thread_handles;
thread_count = 5;
thread_handles = malloc (thread_count*sizeof(pthread_t));
for (thread = 0; thread < thread_count; thread++)
pthread_create(&thread_handles[thread],NULL,MyThread,(void*) thread);
for (thread = 0; thread < thread_count; thread++)
pthread_join(thread_handles[thread], NULL);
free(thread_handles);
return 0;
}
void* Hello(void* rank)
{
long my_rank = (long) rank;
int a,i;
a=0;
for(i=0;i<10;i++)
{
int n = 5;
int count = 0;
pthread_mutex_t mutex = Semaphore(1)
barrier = Semaphore(0)
a = a + 0.9*MAX_INT;
printf("this is %d iteration\n",i);
mutex.wait()
count = count + 1
mutex.signal()
if count == n: barrier.signal() # unblock ONE thread
barrier.wait()
barrier.signal()
}
}
编辑:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#include <time.h>
#include <semaphore.h>
typedef struct {
int n;
int count;
sem_t mutex;
sem_t turnstyle;
sem_t turnstyle2;
} barrier_t;
void init_barrier(barrier_t *barrier, int n)
{
barrier->n = n;
barrier->count = 0;
sem_init(&barrier->mutex, 0, 1);
sem_init(&barrier->turnstyle, 0, 0);
sem_init(&barrier->turnstyle2, 0, 0);
}
void phase1_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (++barrier->count == barrier->n) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstyle);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstyle);
}
void phase2_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (--barrier->count == 0) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstyle2);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstyle2);
}
void wait_barrier(barrier_t *barrier)
{
phase1_barrier(barrier);
phase2_barrier(barrier);
}
#define NUM_THREADS 5
void *myThread(void *);
int main(int argc, char **argv)
{
pthread_t threads[NUM_THREADS];
barrier_t barrier;
int i;
init_barrier(&barrier, NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, myThread, &barrier);
}
for (i = 0; i < NUM_THREADS, i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
void *myThread(void *arg)
{
barrier_t *barrier = arg;
int i,a;
for(i=0;i<10;i++)
{
a = a + 0.9*MAX_INT;
printf("this is %d iteration\n",i);
}
return NULL;
}
好的,如果我们检查第 3.7.7 节 "The Little Book of Semaphores" 中的 Barrier
对象,我们会发现我们需要一个 mutex
和 2 个名为 turnstile
的信号量和 turnstile2
(互斥锁可以是初始化为 1 的信号量)。
由于我们必须使用 POSIX 信号量、pthreads 和 INT_MAX
,因此我们首先包含必要的头文件:
#include <pthread.h>
#include <semaphore.h>
#include <limits.h>
这本书使 Barrier
成为一个对象;然而,在 C 中,我们并没有真正的对象,但我们可以创建一个 struct
并带有一些函数来对其进行操作:
typedef struct {
int n;
int count;
sem_t mutex;
sem_t turnstile;
sem_t turnstile2;
} barrier_t;
我们可以创建一个函数来初始化屏障:
void init_barrier(barrier_t *barrier, int n)
{
barrier->n = n;
barrier->count = 0;
sem_init(&barrier->mutex, 0, 1);
sem_init(&barrier->turnstile, 0, 0);
sem_init(&barrier->turnstile2, 0, 0);
}
并实现phase1
函数,如书中所述:
void phase1_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (++barrier->count == barrier->n) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstile);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstile);
}
请注意,sem_post
函数仅执行一次 post,因此需要循环 post turnstile
n
次。
phase2
函数也同样直接跟随:
void phase2_barrier(barrier_t *barrier)
{
sem_wait(&barrier->mutex);
if (--barrier->count == 0) {
int i;
for (i = 0; i < barrier->n; i++) {
sem_post(&barrier->turnstile2);
}
}
sem_post(&barrier->mutex);
sem_wait(&barrier->turnstile2);
}
终于可以实现wait
功能了:
void wait_barrier(barrier_t *barrier)
{
phase1_barrier(barrier);
phase2_barrier(barrier);
}
现在,在您的 main
函数中,您可以分配和初始化屏障并将其传递给您生成的线程:
#define NUM_THREADS 5
void *myThread(void *);
int main(int argc, char **argv)
{
pthread_t threads[NUM_THREADS];
barrier_t barrier;
int i;
init_barrier(&barrier, NUM_THREADS);
for (i = 0; i < NUM_THREADS; i++) {
pthread_create(&threads[i], NULL, myThread, &barrier);
}
for (i = 0; i < NUM_THREADS, i++) {
pthread_join(threads[i], NULL);
}
return 0;
}
最后,实现线程:
void *myThread(void *arg)
{
barrier_t *barrier = arg;
int i;
int a;
for (i = 0; i < 10; i++) {
for (a = 0; a < 0.9*INT_MAX; a++);
printf("this is %d iteration\n", i);
wait_barrier(barrier);
}
return NULL;
}