当线程数增加时,多线程文件 IO 程序的行为不可预测
Multithreading File IO program behaves unpredictably when number of thread is increased
尝试通过写入不同的块大小和不同的线程数来创建 1Mb(1048576Byte) 的文件。当 int NUM_THREADS = 2
或 int NUM_THREADS = 1
时,创建的文件大小与给定的大小相同,即 10MB 。
然而,当我将线程数增加到 4 时,创建的文件大小约为 400MB;为什么会出现这种异常情况?
#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"
using namespace std;
pthread_mutex_t mutexsum;
struct workDetails {
int threadcount;
int chunkSize;
char *data;
};
void *SPWork(void *threadarg) {
struct workDetails *thisWork;
thisWork = (struct workDetails *) threadarg;
int threadcount = thisWork->threadcount;
int chunkSize = thisWork->chunkSize;
char *data = thisWork->data;
long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
FILE *f = fopen(fileToWrite, "a+");
for (long i = 0; i < noOfWrites; ++i) {
pthread_mutex_lock(&mutexsum);
fprintf(f, "%s", data);
fflush (f);
pthread_mutex_unlock(&mutexsum);
}
fclose(f);
pthread_exit((void *) NULL);
}
int main(int argc, char *argv[]) {
int blocksize[] = {1024};
int NUM_THREADS = 2;
for (int BLOCKSIZE: blocksize) {
char *data = new char[BLOCKSIZE];
fill_n(data, BLOCKSIZE, 'x');
pthread_t thread[NUM_THREADS];
workDetails detail[NUM_THREADS];
pthread_attr_t attr;
int rc;
long threadNo;
void *status;
/* Initialize and set thread detached attribute */
pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
detail[threadNo].threadcount = NUM_THREADS;
detail[threadNo].chunkSize = BLOCKSIZE;
detail[threadNo].data = data;
rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
if (rc) exit(-1);
}
pthread_attr_destroy(&attr);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
rc = pthread_join(thread[threadNo], &status);
if (rc) exit(-1);
}
pthread_mutex_destroy(&mutexsum);
delete[] data;
}
pthread_exit(NULL);
}
N.B。 -
1)这是一个基准测试任务,所以按照他们的要求去做。
2) long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
基本上计算每个线程应该写入多少次以获得 10MB 的组合大小。
4)我试着把互斥锁放在不同的位置。所有产量都相同
也欢迎对程序的其他改动提出建议
您正在像这样分配和初始化数据数组:
char *data = new char[BLOCKSIZE];
fill_n(data, BLOCKSIZE, 'x');
然后您使用 fprintf
:
将其写入文件
fprintf(f, "%s", data);
函数 fprintf
期望 data
是一个 null-terminated 字符串。这已经是未定义的行为。如果这适用于少量线程,那是因为内存块之后的内存恰好包含零字节。
除此之外,您程序中的互斥锁没有任何用处,可以将其删除。文件锁定也是多余的,因此您可以使用 fwrite_unlocked
和 fflush_unlocked
来写入数据,因为每个线程都使用单独的 FILE
对象。基本上你程序中的所有同步都是在内核中执行的,而不是在用户空间中。
即使在删除互斥锁并使用 _unlocked
函数后,您的程序仍能可靠地创建 1 MB 的文件,而不管线程数如何。因此无效的文件写入似乎是您遇到的唯一问题。
@Ivan 是的!是的!是的! .你是完全正确的我的朋友。除了一个小事实。互斥体是必要的。这是最终代码。尝试删除互斥锁,文件大小会有所不同。
#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"
using namespace std;
pthread_mutex_t mutexsum;;
struct workDetails {
int threadcount;
int chunkSize;
char *data;
};
void *SPWork(void *threadarg) {
struct workDetails *thisWork;
thisWork = (struct workDetails *) threadarg;
int threadcount = thisWork->threadcount;
int chunkSize = thisWork->chunkSize;
char *data = thisWork->data;
long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
FILE *f = fopen(fileToWrite, "a+");
for (long i = 0; i < noOfWrites; ++i) {
pthread_mutex_lock(&mutexsum);
fprintf(f, "%s", data);
fflush (f);
pthread_mutex_unlock(&mutexsum);
}
fclose(f);
pthread_exit((void *) NULL);
}
int main(int argc, char *argv[]) {
int blocksize[] = {1024};
int NUM_THREADS = 128;
for (int BLOCKSIZE: blocksize) {
char *data = new char[BLOCKSIZE+1];
fill_n(data, BLOCKSIZE, 'x');
data[BLOCKSIZE] = NULL;
pthread_t thread[NUM_THREADS];
workDetails detail[NUM_THREADS];
pthread_attr_t attr;
int rc;
long threadNo;
void *status;
pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
detail[threadNo].threadcount = NUM_THREADS;
detail[threadNo].chunkSize = BLOCKSIZE;
detail[threadNo].data = data;
rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
if (rc) exit(-1);
}
pthread_attr_destroy(&attr);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
rc = pthread_join(thread[threadNo], &status);
if (rc) exit(-1);
}
pthread_mutex_destroy(&mutexsum);
delete[] data;
}
pthread_exit(NULL);
}
尝试通过写入不同的块大小和不同的线程数来创建 1Mb(1048576Byte) 的文件。当 int NUM_THREADS = 2
或 int NUM_THREADS = 1
时,创建的文件大小与给定的大小相同,即 10MB 。
然而,当我将线程数增加到 4 时,创建的文件大小约为 400MB;为什么会出现这种异常情况?
#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"
using namespace std;
pthread_mutex_t mutexsum;
struct workDetails {
int threadcount;
int chunkSize;
char *data;
};
void *SPWork(void *threadarg) {
struct workDetails *thisWork;
thisWork = (struct workDetails *) threadarg;
int threadcount = thisWork->threadcount;
int chunkSize = thisWork->chunkSize;
char *data = thisWork->data;
long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
FILE *f = fopen(fileToWrite, "a+");
for (long i = 0; i < noOfWrites; ++i) {
pthread_mutex_lock(&mutexsum);
fprintf(f, "%s", data);
fflush (f);
pthread_mutex_unlock(&mutexsum);
}
fclose(f);
pthread_exit((void *) NULL);
}
int main(int argc, char *argv[]) {
int blocksize[] = {1024};
int NUM_THREADS = 2;
for (int BLOCKSIZE: blocksize) {
char *data = new char[BLOCKSIZE];
fill_n(data, BLOCKSIZE, 'x');
pthread_t thread[NUM_THREADS];
workDetails detail[NUM_THREADS];
pthread_attr_t attr;
int rc;
long threadNo;
void *status;
/* Initialize and set thread detached attribute */
pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
detail[threadNo].threadcount = NUM_THREADS;
detail[threadNo].chunkSize = BLOCKSIZE;
detail[threadNo].data = data;
rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
if (rc) exit(-1);
}
pthread_attr_destroy(&attr);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
rc = pthread_join(thread[threadNo], &status);
if (rc) exit(-1);
}
pthread_mutex_destroy(&mutexsum);
delete[] data;
}
pthread_exit(NULL);
}
N.B。 -
1)这是一个基准测试任务,所以按照他们的要求去做。
2) long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
基本上计算每个线程应该写入多少次以获得 10MB 的组合大小。
4)我试着把互斥锁放在不同的位置。所有产量都相同
也欢迎对程序的其他改动提出建议
您正在像这样分配和初始化数据数组:
char *data = new char[BLOCKSIZE];
fill_n(data, BLOCKSIZE, 'x');
然后您使用 fprintf
:
fprintf(f, "%s", data);
函数 fprintf
期望 data
是一个 null-terminated 字符串。这已经是未定义的行为。如果这适用于少量线程,那是因为内存块之后的内存恰好包含零字节。
除此之外,您程序中的互斥锁没有任何用处,可以将其删除。文件锁定也是多余的,因此您可以使用 fwrite_unlocked
和 fflush_unlocked
来写入数据,因为每个线程都使用单独的 FILE
对象。基本上你程序中的所有同步都是在内核中执行的,而不是在用户空间中。
即使在删除互斥锁并使用 _unlocked
函数后,您的程序仍能可靠地创建 1 MB 的文件,而不管线程数如何。因此无效的文件写入似乎是您遇到的唯一问题。
@Ivan 是的!是的!是的! .你是完全正确的我的朋友。除了一个小事实。互斥体是必要的。这是最终代码。尝试删除互斥锁,文件大小会有所不同。
#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"
using namespace std;
pthread_mutex_t mutexsum;;
struct workDetails {
int threadcount;
int chunkSize;
char *data;
};
void *SPWork(void *threadarg) {
struct workDetails *thisWork;
thisWork = (struct workDetails *) threadarg;
int threadcount = thisWork->threadcount;
int chunkSize = thisWork->chunkSize;
char *data = thisWork->data;
long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
FILE *f = fopen(fileToWrite, "a+");
for (long i = 0; i < noOfWrites; ++i) {
pthread_mutex_lock(&mutexsum);
fprintf(f, "%s", data);
fflush (f);
pthread_mutex_unlock(&mutexsum);
}
fclose(f);
pthread_exit((void *) NULL);
}
int main(int argc, char *argv[]) {
int blocksize[] = {1024};
int NUM_THREADS = 128;
for (int BLOCKSIZE: blocksize) {
char *data = new char[BLOCKSIZE+1];
fill_n(data, BLOCKSIZE, 'x');
data[BLOCKSIZE] = NULL;
pthread_t thread[NUM_THREADS];
workDetails detail[NUM_THREADS];
pthread_attr_t attr;
int rc;
long threadNo;
void *status;
pthread_mutex_init(&mutexsum, NULL);
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
detail[threadNo].threadcount = NUM_THREADS;
detail[threadNo].chunkSize = BLOCKSIZE;
detail[threadNo].data = data;
rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
if (rc) exit(-1);
}
pthread_attr_destroy(&attr);
for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
rc = pthread_join(thread[threadNo], &status);
if (rc) exit(-1);
}
pthread_mutex_destroy(&mutexsum);
delete[] data;
}
pthread_exit(NULL);
}