线程读取c中的文件大小(互斥问题)
threads to read file size in c (Mutex problem)
我有一个来自学校的项目,我们将在其中制作一个包含线程的程序,该程序将计算目录中文件的大小。我们需要使用 lstat 来获取文件的块大小。我的解决方案是创建所有线程,主线程将遍历目录并将找到的每个文件添加到队列中。然后线程可以从队列中取出一个文件并使用 lstat 获取块大小。图片显示了我做这个时的想法。
该程序的工作方式符合我的预期。但是我遇到的问题是,对于我添加到程序中的每个线程,完成它所花费的时间对于我添加的每个线程来说都更高。因此,一个线程需要 5 秒,而 2 个线程需要 7 秒。
我将展示我编写的一些代码。我想我想念在这里如何使用互斥锁,因为互斥锁块是添加到队列的主要线程,并且所有线程在添加到队列时都被阻塞。所以如果你看到我想念互斥量的东西请告诉我:)。
/* If -j flag is used and it's more then 1 thread */
if(flag_j == 1 && threads > 1) {
/* Create each thread */
createThreads(th, threads);
/* Make a task of each file and add task to queue */
makeTask(argv);
/* Tell that no more file will be added to the queue */
pthread_mutex_lock(&mutex);
endThreads = 1;
pthread_mutex_unlock(&mutex);
/* Join thread's */
joinThreads(th, threads);
}
/**
* Going to create all the thread's that was given
* with the -j flag at start.
* @param th An array of all the thread's.
* @param threads The number of thread's.
*
*/
void createThreads(pthread_t th[], int threads) {
for(int i = 0; i < threads; i++) {
int *a = malloc(sizeof(a));
*a = i;
/* pthread create */
if(pthread_create(&th[i], NULL, &getQueue, a) != 0) {
perror("Failed to create thread");
exit(EXIT_FAILURE);
}
}
}
/**
* Each thread will get a task from the queue to execute until
* the queue is empty and the conditional variable is set to 1.
* @param arg In this case it's NULL.
*
*/
void *getQueue(void *arg) {
free(arg);
while(1) {
Task task;
/* Lock */
pthread_mutex_lock(&mutex);
if(queueCount > 0) {
/* Get the first task in queue */
task = queue[0];
/* Move all the task in the queue forward one step */
for (int i = 0; i < queueCount - 1; i++)
{
queue[i] = queue[i + 1];
}
queueCount--;
/* execute the task */
pthread_mutex_unlock(&mutex);
execute(&task);
} else if ( endThreads == 1) {
pthread_mutex_unlock(&mutex);
break;
} else {
pthread_mutex_unlock(&mutex);
}
}
/* The thread will just resturn NULL */
return arg;
}
/**
* Find the correct struct in the list of arguments.
* Then add the size of the file to that struct and
* add the struct back in to the list.
* @param task Is the task that's need to be executed.
*
*/
void execute(Task* task) {
int i = 0;
char *path = malloc(1024*sizeof(path) + 1);
/* Get the first name of the path */
while (task->file[i] != '[=10=]')
{
/* Break when it find's the first "/" */
if (task->file[i] == '/' && i != 0)
{
break;
}
path[i] = task->file[i];
i++;
}
path[i] = '[=10=]';
pthread_mutex_lock(&mutex1);
for (int j = 0; j < numOfArg; j++)
{
Arguments entity;
/* Get the first struct in listOfArg */
entity = listOfArg[0];
/* Move all the struct's one step forward */
for (int k = 0; k < numOfArg - 1; k++){
listOfArg[k] = listOfArg[k + 1];
}
/* Check if the struct name is the same name as the path */
if (strcmp(path, entity.name) == 0)
{
/* Add the size of the file to the struct size */
entity.size += getFileSize(task->file);
/* Add the struct back to the list */
listOfArg[numOfArg - 1] = entity;
break;
} else {
/* Add the struct back to the list */
listOfArg[numOfArg - 1] = entity;
}
}
/* Get the size of the file and add it to the total size */
totalSize += getFileSize(task->file);
pthread_mutex_unlock(&mutex1);
free(path);
}
/**
* Will check if the argument is a directory or an file.
* If it is a directory it's going to open and get the file's.
* If it is a file it will make a Task and add the file to it
* and add the task to the queue.
* @param argv An array of the argument's.
*
*/
void makeTask(char *argv[]) {
while(argv[optind] != NULL) {
if (isDirectory(argv[optind]))
{
filesDir(argv[optind]);
} else {
Task t;
t.file[0] = '[=10=]';
strcpy(t.file, argv[optind]);
addTask(t);
}
optind++;
}
}
/**
* Will add the task to the queue of task for the thread's
* to execute.
* @param task The task to add to the queue of task's.
*
*/
void addTask(Task task) {
pthread_mutex_lock(&mutex);
queue[queueCount] = task;
queueCount++;
pthread_mutex_unlock(&mutex);
}
这些评论表达了对多线程可以加速和不能加速哪些类型的操作的合理关注,但它们似乎在很大程度上锁定了对您的观察结果的一个似是而非的解释,而没有考虑其他解释。 I/O 考虑因素 将 限制并行化可实现的收益,但操作系统实施措施以尝试减少慢速 I/O 的影响,例如预读以及数据和元数据缓存。因此,在这种情况下,I/O 瓶颈不一定足以完全克服并行计算的所有加速。你同学的观察在这里不能被忽视,尽管假设他们的程序是正确的也不一定安全。
事实上,您的代码还有多个其他问题导致多线程性能和缩放不佳,包括
忙-等待。当 getQueue()
检查队列并发现它为空时,它会解锁互斥锁,然后立即尝试重新获取它。尽管其他线程正在等待互斥体,例如试图填充队列的线程,但重新获取很有可能成功。添加的工作线程越多,主线程 运行ning makeTask()
就越难排队任务。
这类问题通常借助条件变量来解决。
非常低效的队列实现。每次从队列中移除一个元素时,队列的整个尾部都会向上移动,以便头部位于索引 0 处。所有这些都必须在锁定互斥锁的情况下执行,从而减少了整个工作的比例可以 运行 并行。有多种性能更好的替代方案——例如循环队列或链表。
execute()
中第二个临界区的出现。目前还不清楚 listOfArg
的所有这些操作是关于什么的,但它也限制了并行执行的程序的比例。并且它与操作任务队列所涉及的效率问题类似。
该程序还表现出一些普遍的低效率问题,这些问题可能对加速和缩放没有太大影响,例如通过循环而不是通过 memmove()
一次围绕一个元素移动数组的额外实例,hand - 滚动字符串操作而不是使用内置的、优化的库函数,例如 strchr()
,并执行动态内存分配,自动分配会更好。
我有一个来自学校的项目,我们将在其中制作一个包含线程的程序,该程序将计算目录中文件的大小。我们需要使用 lstat 来获取文件的块大小。我的解决方案是创建所有线程,主线程将遍历目录并将找到的每个文件添加到队列中。然后线程可以从队列中取出一个文件并使用 lstat 获取块大小。图片显示了我做这个时的想法。
该程序的工作方式符合我的预期。但是我遇到的问题是,对于我添加到程序中的每个线程,完成它所花费的时间对于我添加的每个线程来说都更高。因此,一个线程需要 5 秒,而 2 个线程需要 7 秒。
我将展示我编写的一些代码。我想我想念在这里如何使用互斥锁,因为互斥锁块是添加到队列的主要线程,并且所有线程在添加到队列时都被阻塞。所以如果你看到我想念互斥量的东西请告诉我:)。
/* If -j flag is used and it's more then 1 thread */
if(flag_j == 1 && threads > 1) {
/* Create each thread */
createThreads(th, threads);
/* Make a task of each file and add task to queue */
makeTask(argv);
/* Tell that no more file will be added to the queue */
pthread_mutex_lock(&mutex);
endThreads = 1;
pthread_mutex_unlock(&mutex);
/* Join thread's */
joinThreads(th, threads);
}
/**
* Going to create all the thread's that was given
* with the -j flag at start.
* @param th An array of all the thread's.
* @param threads The number of thread's.
*
*/
void createThreads(pthread_t th[], int threads) {
for(int i = 0; i < threads; i++) {
int *a = malloc(sizeof(a));
*a = i;
/* pthread create */
if(pthread_create(&th[i], NULL, &getQueue, a) != 0) {
perror("Failed to create thread");
exit(EXIT_FAILURE);
}
}
}
/**
* Each thread will get a task from the queue to execute until
* the queue is empty and the conditional variable is set to 1.
* @param arg In this case it's NULL.
*
*/
void *getQueue(void *arg) {
free(arg);
while(1) {
Task task;
/* Lock */
pthread_mutex_lock(&mutex);
if(queueCount > 0) {
/* Get the first task in queue */
task = queue[0];
/* Move all the task in the queue forward one step */
for (int i = 0; i < queueCount - 1; i++)
{
queue[i] = queue[i + 1];
}
queueCount--;
/* execute the task */
pthread_mutex_unlock(&mutex);
execute(&task);
} else if ( endThreads == 1) {
pthread_mutex_unlock(&mutex);
break;
} else {
pthread_mutex_unlock(&mutex);
}
}
/* The thread will just resturn NULL */
return arg;
}
/**
* Find the correct struct in the list of arguments.
* Then add the size of the file to that struct and
* add the struct back in to the list.
* @param task Is the task that's need to be executed.
*
*/
void execute(Task* task) {
int i = 0;
char *path = malloc(1024*sizeof(path) + 1);
/* Get the first name of the path */
while (task->file[i] != '[=10=]')
{
/* Break when it find's the first "/" */
if (task->file[i] == '/' && i != 0)
{
break;
}
path[i] = task->file[i];
i++;
}
path[i] = '[=10=]';
pthread_mutex_lock(&mutex1);
for (int j = 0; j < numOfArg; j++)
{
Arguments entity;
/* Get the first struct in listOfArg */
entity = listOfArg[0];
/* Move all the struct's one step forward */
for (int k = 0; k < numOfArg - 1; k++){
listOfArg[k] = listOfArg[k + 1];
}
/* Check if the struct name is the same name as the path */
if (strcmp(path, entity.name) == 0)
{
/* Add the size of the file to the struct size */
entity.size += getFileSize(task->file);
/* Add the struct back to the list */
listOfArg[numOfArg - 1] = entity;
break;
} else {
/* Add the struct back to the list */
listOfArg[numOfArg - 1] = entity;
}
}
/* Get the size of the file and add it to the total size */
totalSize += getFileSize(task->file);
pthread_mutex_unlock(&mutex1);
free(path);
}
/**
* Will check if the argument is a directory or an file.
* If it is a directory it's going to open and get the file's.
* If it is a file it will make a Task and add the file to it
* and add the task to the queue.
* @param argv An array of the argument's.
*
*/
void makeTask(char *argv[]) {
while(argv[optind] != NULL) {
if (isDirectory(argv[optind]))
{
filesDir(argv[optind]);
} else {
Task t;
t.file[0] = '[=10=]';
strcpy(t.file, argv[optind]);
addTask(t);
}
optind++;
}
}
/**
* Will add the task to the queue of task for the thread's
* to execute.
* @param task The task to add to the queue of task's.
*
*/
void addTask(Task task) {
pthread_mutex_lock(&mutex);
queue[queueCount] = task;
queueCount++;
pthread_mutex_unlock(&mutex);
}
这些评论表达了对多线程可以加速和不能加速哪些类型的操作的合理关注,但它们似乎在很大程度上锁定了对您的观察结果的一个似是而非的解释,而没有考虑其他解释。 I/O 考虑因素 将 限制并行化可实现的收益,但操作系统实施措施以尝试减少慢速 I/O 的影响,例如预读以及数据和元数据缓存。因此,在这种情况下,I/O 瓶颈不一定足以完全克服并行计算的所有加速。你同学的观察在这里不能被忽视,尽管假设他们的程序是正确的也不一定安全。
事实上,您的代码还有多个其他问题导致多线程性能和缩放不佳,包括
忙-等待。当
getQueue()
检查队列并发现它为空时,它会解锁互斥锁,然后立即尝试重新获取它。尽管其他线程正在等待互斥体,例如试图填充队列的线程,但重新获取很有可能成功。添加的工作线程越多,主线程 运行ningmakeTask()
就越难排队任务。这类问题通常借助条件变量来解决。
非常低效的队列实现。每次从队列中移除一个元素时,队列的整个尾部都会向上移动,以便头部位于索引 0 处。所有这些都必须在锁定互斥锁的情况下执行,从而减少了整个工作的比例可以 运行 并行。有多种性能更好的替代方案——例如循环队列或链表。
execute()
中第二个临界区的出现。目前还不清楚listOfArg
的所有这些操作是关于什么的,但它也限制了并行执行的程序的比例。并且它与操作任务队列所涉及的效率问题类似。
该程序还表现出一些普遍的低效率问题,这些问题可能对加速和缩放没有太大影响,例如通过循环而不是通过 memmove()
一次围绕一个元素移动数组的额外实例,hand - 滚动字符串操作而不是使用内置的、优化的库函数,例如 strchr()
,并执行动态内存分配,自动分配会更好。