当线程数增加时,多线程文件 IO 程序的行为不可预测

Multithreading File IO program behaves unpredictably when number of thread is increased

尝试通过写入不同的块大小和不同的线程数来创建 1Mb(1048576Byte) 的文件。当 int NUM_THREADS = 2int NUM_THREADS = 1 时,创建的文件大小与给定的大小相同,即 10MB 。

然而,当我将线程数增加到 4 时,创建的文件大小约为 400MB;为什么会出现这种异常情况?

#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"

using namespace std;
pthread_mutex_t mutexsum;
struct workDetails {
    int threadcount;
    int chunkSize;
    char *data;
};

void *SPWork(void *threadarg) {
    struct workDetails *thisWork;
    thisWork = (struct workDetails *) threadarg;
    int threadcount = thisWork->threadcount;
    int chunkSize = thisWork->chunkSize;
    char *data = thisWork->data;
    long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
    FILE *f = fopen(fileToWrite, "a+");
    for (long i = 0; i < noOfWrites; ++i) {
        pthread_mutex_lock(&mutexsum);
        fprintf(f, "%s", data);
        fflush (f);
        pthread_mutex_unlock(&mutexsum);
    }
    fclose(f);
    pthread_exit((void *) NULL);
}

int main(int argc, char *argv[]) {
    int blocksize[] = {1024};
    int NUM_THREADS = 2;
    for (int BLOCKSIZE: blocksize) {
        char *data = new char[BLOCKSIZE];
        fill_n(data, BLOCKSIZE, 'x');

        pthread_t thread[NUM_THREADS];
        workDetails detail[NUM_THREADS];
        pthread_attr_t attr;
        int rc;
        long threadNo;
        void *status;

        /* Initialize and set thread detached attribute */
        pthread_mutex_init(&mutexsum, NULL);
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
        for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
            detail[threadNo].threadcount = NUM_THREADS;
            detail[threadNo].chunkSize = BLOCKSIZE;
            detail[threadNo].data = data;
            rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
            if (rc) exit(-1);
        }
        pthread_attr_destroy(&attr);
        for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
            rc = pthread_join(thread[threadNo], &status);
            if (rc) exit(-1);
        }
        pthread_mutex_destroy(&mutexsum);
        delete[] data;
    }
    pthread_exit(NULL);
}

N.B。 - 1)这是一个基准测试任务,所以按照他们的要求去做。 2) long noOfWrites = (TenGBtoByte / (threadcount * chunkSize)); 基本上计算每个线程应该写入多少次以获得 10MB 的组合大小。 4)我试着把互斥锁放在不同的位置。所有产量都相同

也欢迎对程序的其他改动提出建议

您正在像这样分配和初始化数据数组:

char *data = new char[BLOCKSIZE];
fill_n(data, BLOCKSIZE, 'x');

然后您使用 fprintf:

将其写入文件
fprintf(f, "%s", data);

函数 fprintf 期望 data 是一个 null-terminated 字符串。这已经是未定义的行为。如果这适用于少量线程,那是因为内存块之后的内存恰好包含零字节。

除此之外,您程序中的互斥锁没有任何用处,可以将其删除。文件锁定也是多余的,因此您可以使用 fwrite_unlockedfflush_unlocked 来写入数据,因为每个线程都使用单独的 FILE 对象。基本上你程序中的所有同步都是在内核中执行的,而不是在用户空间中。

即使在删除互斥锁并使用 _unlocked 函数后,您的程序仍能可靠地创建 1 MB 的文件,而不管线程数如何。因此无效的文件写入似乎是您遇到的唯一问题。

@Ivan 是的!是的!是的! .你是完全正确的我的朋友。除了一个小事实。互斥体是必要的。这是最终代码。尝试删除互斥锁,文件大小会有所不同。

#include <pthread.h>
#include <string>
#include <iostream>
#define TenGBtoByte 1048576
#define fileToWrite "/tmp/schatterjee.txt"

using namespace std;
pthread_mutex_t mutexsum;;
struct workDetails {
    int threadcount;
    int chunkSize;
    char *data;
};

void *SPWork(void *threadarg) {

    struct workDetails *thisWork;
    thisWork = (struct workDetails *) threadarg;
    int threadcount = thisWork->threadcount;
    int chunkSize = thisWork->chunkSize;
    char *data = thisWork->data;
    long noOfWrites = (TenGBtoByte / (threadcount * chunkSize));
    FILE *f = fopen(fileToWrite, "a+");

    for (long i = 0; i < noOfWrites; ++i) {
        pthread_mutex_lock(&mutexsum);
        fprintf(f, "%s", data);
        fflush (f);
        pthread_mutex_unlock(&mutexsum);
    }
    fclose(f);
    pthread_exit((void *) NULL);
}

int main(int argc, char *argv[]) {
    int blocksize[] = {1024};
    int NUM_THREADS = 128;
    for (int BLOCKSIZE: blocksize) {
        char *data = new char[BLOCKSIZE+1];
        fill_n(data, BLOCKSIZE, 'x');
        data[BLOCKSIZE] = NULL;

        pthread_t thread[NUM_THREADS];
        workDetails detail[NUM_THREADS];
        pthread_attr_t attr;
        int rc;
        long threadNo;
        void *status;
        pthread_mutex_init(&mutexsum, NULL);
        pthread_attr_init(&attr);
        pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE);
        for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
            detail[threadNo].threadcount = NUM_THREADS;
            detail[threadNo].chunkSize = BLOCKSIZE;
            detail[threadNo].data = data;
            rc = pthread_create(&thread[threadNo], &attr, SPWork, (void *) &detail[threadNo]);
            if (rc) exit(-1);
        }
        pthread_attr_destroy(&attr);
        for (threadNo = 0; threadNo < NUM_THREADS; threadNo++) {
            rc = pthread_join(thread[threadNo], &status);
            if (rc) exit(-1);
        }
        pthread_mutex_destroy(&mutexsum);
        delete[] data;
    }
    pthread_exit(NULL);
}