慢线程消费者
Slow pthread consumer
我已经使用 pthreads 和信号量在 C 中实现了生产者/消费者问题的解决方案。
我的主线程是生产者,我启动了N个消费者线程。
我的代码是:
typedef struct
{
int buf[BUFSIZE]; /* shared var */
int in; /* buf[in%BUFSIZE] is the first empty slot */
int out; /* buf[out%BUFSIZE] is the first full slot */
sem_t full; /* keep track of the number of full spots */
sem_t empty; /* keep track of the number of empty spots */
pthread_mutex_t mutex; /* enforce mutual exclusion to shared data */
} CONSUMER_STRUCT;
CONSUMER_STRUCT shared;
这是我的每个消费者线程的代码:
void *Consumer(void *arg)
{
int fd, workerID, i, hit=0;
workerID = *(int *)arg;
for (;;) {
sem_wait(&shared.full);
pthread_mutex_lock(&shared.mutex);
fd = shared.buf[shared.out];
printf("\n[C%d] Consumed. I got %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
ftp(fd, hit);
shared.buf[shared.out] = 0;
shared.out = (shared.out+1)%BUFSIZE;
fflush(stdout);
printf("\n\n\n\nEstado do buffer:\n\n\n\n");
for (i = 0; i < BUFSIZE; i++) {
//printf("%d ", shared.buf[i]);
}
/* Release the buffer */
pthread_mutex_unlock(&shared.mutex);
/* Increment the number of full slots */
sem_post(&shared.empty);
hit++;
}
return NULL;
}
这是我的生产者线程的代码:
item = socketfd;
sem_wait(&shared.empty);
pthread_mutex_lock(&shared.mutex);
shared.buf[shared.in] = item;
shared.in = (shared.in + 1) % BUFSIZE;
fflush(stdout);
pthread_mutex_unlock(&shared.mutex);
sem_post(&shared.full);
一切正常,但处理 22 个文件大约需要 20 秒,而为每个请求创建一个线程大约需要 2 秒!这似乎是一次执行一个线程,我想执行所有线程"at the same time"。
我的实施方法有问题吗?
对于那些可能遇到类似问题的人,这里是解决方法。
感谢@Martin James 和@EOF。
void *Consumer(void *arg)
{
int fd, workerID, i, hit=0;
workerID = *(int *)arg;
for (;;) {
sem_wait(&shared.full);
pthread_mutex_lock(&shared.mutex);
fd = shared.buf[shared.out];
shared.buf[shared.out] = 0;
shared.out = (shared.out+1)%BUFSIZE;
pthread_mutex_unlock(&shared.mutex);
printf("\n[C%d] Consumed. I got %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
ftp(fd, hit);
fflush(stdout);
printf("\n\n\n\nEstado do buffer:\n\n\n\n");
for (i = 0; i < BUFSIZE; i++) {
//printf("%d ", shared.buf[i]);
}
/* Release the buffer */
/* Increment the number of full slots */
sem_post(&shared.empty);
hit++;
}
return NULL;
}
问题是我锁定了互斥体,执行了一个函数,然后解锁了互斥体。这就是导致执行延迟的原因。
我已经使用 pthreads 和信号量在 C 中实现了生产者/消费者问题的解决方案。
我的主线程是生产者,我启动了N个消费者线程。
我的代码是:
typedef struct
{
int buf[BUFSIZE]; /* shared var */
int in; /* buf[in%BUFSIZE] is the first empty slot */
int out; /* buf[out%BUFSIZE] is the first full slot */
sem_t full; /* keep track of the number of full spots */
sem_t empty; /* keep track of the number of empty spots */
pthread_mutex_t mutex; /* enforce mutual exclusion to shared data */
} CONSUMER_STRUCT;
CONSUMER_STRUCT shared;
这是我的每个消费者线程的代码:
void *Consumer(void *arg)
{
int fd, workerID, i, hit=0;
workerID = *(int *)arg;
for (;;) {
sem_wait(&shared.full);
pthread_mutex_lock(&shared.mutex);
fd = shared.buf[shared.out];
printf("\n[C%d] Consumed. I got %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
ftp(fd, hit);
shared.buf[shared.out] = 0;
shared.out = (shared.out+1)%BUFSIZE;
fflush(stdout);
printf("\n\n\n\nEstado do buffer:\n\n\n\n");
for (i = 0; i < BUFSIZE; i++) {
//printf("%d ", shared.buf[i]);
}
/* Release the buffer */
pthread_mutex_unlock(&shared.mutex);
/* Increment the number of full slots */
sem_post(&shared.empty);
hit++;
}
return NULL;
}
这是我的生产者线程的代码:
item = socketfd;
sem_wait(&shared.empty);
pthread_mutex_lock(&shared.mutex);
shared.buf[shared.in] = item;
shared.in = (shared.in + 1) % BUFSIZE;
fflush(stdout);
pthread_mutex_unlock(&shared.mutex);
sem_post(&shared.full);
一切正常,但处理 22 个文件大约需要 20 秒,而为每个请求创建一个线程大约需要 2 秒!这似乎是一次执行一个线程,我想执行所有线程"at the same time"。
我的实施方法有问题吗?
对于那些可能遇到类似问题的人,这里是解决方法。
感谢@Martin James 和@EOF。
void *Consumer(void *arg)
{
int fd, workerID, i, hit=0;
workerID = *(int *)arg;
for (;;) {
sem_wait(&shared.full);
pthread_mutex_lock(&shared.mutex);
fd = shared.buf[shared.out];
shared.buf[shared.out] = 0;
shared.out = (shared.out+1)%BUFSIZE;
pthread_mutex_unlock(&shared.mutex);
printf("\n[C%d] Consumed. I got %d ...Valor do buffer: %d na posição %d\n\n\n", workerID, fd, shared.buf[shared.out], shared.out);
ftp(fd, hit);
fflush(stdout);
printf("\n\n\n\nEstado do buffer:\n\n\n\n");
for (i = 0; i < BUFSIZE; i++) {
//printf("%d ", shared.buf[i]);
}
/* Release the buffer */
/* Increment the number of full slots */
sem_post(&shared.empty);
hit++;
}
return NULL;
}
问题是我锁定了互斥体,执行了一个函数,然后解锁了互斥体。这就是导致执行延迟的原因。