当一个进程一次写入 FIFO 时,使用命名管道的 C Linux 程序按预期工作,但在更多实例写入时中断

C Linux program using named pipes works as expected when one process at a time writes into the FIFO, but breaks when more instances do it

我的程序有一个消费者和多个生产者。每个生产者读取一个不同的文件,并将其内容以 N 大小的块写入 FIFO,并带有供消费者解释的前导参数。

消费者应该获取这些块并组成一个输出文件,其中每一行对应一个生产者。来自块的前导参数用于确定块的所有者以及将其写入的位置(它是输出文件中的行号)。

我的问题是,即使在只有一个制作人的情况下它基本上可以正常工作,但如果再多的话,生成的文件就会变得一团糟。还有一些意想不到的过度\n,但它们并不重要。

这是我的预期输出:

aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b bbbbbbb4b bbbbbbbbbb5b bb6b
cccccccccc1c cccc2c cccccccc3c ccccc4c ccccccccc5c ccccccccccccc6c

但这就是我得到的:

aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a2  bbbbbbb43 cccccccc53  cccccccc2  bbbbbbbb2 b5b bb6b3 cccc6c2
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b
cccccccccc1c cccc2c cccccccc3c ccccc4c c

后几行意外中断,块变得混乱。

我认为我处理命名管道的方式存在问题,因为我在进一步处理之前打印“原始输入”并且我可以看到我正在从管道读取无效数据。但是 AFAIK Linux 对 FIFO 的小块数据有原子写入。也许读取不关心写入,这就是问题所在?

消费者代码:

#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>

char *filename;

size_t getFileSize(FILE *fp) {
    fseek(fp, 0, SEEK_END);
    size_t len = ftell(fp);
    rewind(fp);
    printf("length %ld \n", len);
    return len;
}
int nthFunctionCall = 1;

void printFile(FILE *file) {
    char *fileContent = NULL;
    if(file != NULL) {
        size_t size = getFileSize(file);
        fileContent = malloc((size /* + 1*/) * sizeof(char));
        fread(fileContent, 1, size, file);
        //fileContent[size + 1] = '[=13=]'; ?
    }
    printf("FILE CONTENT: \n%s\n", fileContent);
}

void writeToFile(long targetLineNumber, char *text) {
    FILE *temp = fopen("temp", "w");
    if(temp == NULL) {
        perror("can't create temp");
        exit(-1);
    }
    char *fileContents = NULL;
    FILE *file = fopen(filename, "r");
    if(file != NULL) {
        size_t size = getFileSize(file);
        fileContents = malloc((size + 1) * sizeof(char));
        fread(fileContents, 1, size, file);
        fileContents[size] = '[=13=]'; // tbh, I don't know whether I should do this or not.
        fclose(file);
    }
    char *fileContentsCpy = fileContents;

    printf("FILE CONTENT:\n %s\n", fileContents);

    printf("%d Text to save %s\n", nthFunctionCall, text);

    char *currentLineFromFile;
    size_t processedLineNumber;
    for (processedLineNumber = 1; (currentLineFromFile = strsep(&fileContents, "\n")) != NULL; processedLineNumber++) {
        printf("%d targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);
        printf("%d copy the current line into temp: %s\n", nthFunctionCall, currentLineFromFile);
        fputs(currentLineFromFile, temp);
        if(processedLineNumber == targetLineNumber) {
            printf("%d add text to line %ld: %s\n", nthFunctionCall, processedLineNumber, text);
            fputs(text, temp);
        }
        fputs("\n", temp);
        fflush(temp);
    }

    printf("%d Finished loop with: targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);

    if(targetLineNumber >= processedLineNumber) {
        for (int j = 0; j < (targetLineNumber - processedLineNumber); ++j) {
            fputs("\n", temp);
        }
        printf("%d added text: %s\n", nthFunctionCall, text);
        fputs(text, temp);
        fflush(temp);
    }
    fclose(temp);

    if(fileContentsCpy != NULL) free(fileContentsCpy);
    nthFunctionCall++;
    remove(filename);
    rename("temp", filename);
    printf("One iteration end\n");
}

int numberLength(size_t number) {
    int len = 0;
    while(number > 0) {
        number /= 10;
        len++;
    }
    return len;
}

int main(int argc, char **argv)
{
    if (argc < 4) {
        fprintf(stderr, "testConsument <fifo_path> <file_to_save_in> <chunk size>\n");
        exit(-1);
    }

    char *myfifo = argv[1];
    filename = argv[2];
    int numberToRead = atoi(argv[3]);

    int fd = open(myfifo, O_RDONLY);
    perror("sdada test consument");
    char *str1 = calloc(100, sizeof(char));
    while (read(fd, str1, numberToRead + 3) > 0) {
        long lineNumber;
        printf("length: %ld raw input: %s\n", strlen(str1), str1);
        sscanf(str1, "%ld", &lineNumber);
        char* content = str1 + numberLength(lineNumber) + 1; // lines should be of the format "<number> <chunk-sized-word>[=13=]"
        printf("add to line %ld content : %s \n", lineNumber, content);
        writeToFile(lineNumber, content);
        sleep(1);
        free(str1);
        str1 = calloc(100, sizeof(char));
        printf("#################\n");
    }
    free(str1);
    close(fd);

    FILE *res = fopen(filename, "r");
    printFile(res);
    fclose(res);
    return 0;
}

生产者代码:

#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>

size_t getFileSize(FILE *fp) {
    fseek(fp, 0, SEEK_END);
    size_t len = ftell(fp);
    rewind(fp);
    return len;
}

int main(int argc, char **argv) {
    if (argc < 5) {
        fprintf(stderr, "producer <fifo_path> <line_number_to_save_in> <input_file> <chunk_size>\n");
        exit(-1);
    }

    char *myfifo = argv[1];
    char *lineNumber = argv[2];
    int numberToRead = atoi(argv[4]);

    mkfifo(myfifo, 0666);
    int fd = open(myfifo, O_WRONLY);

    char *someFilePath = argv[3];
    FILE *somFile = fopen(someFilePath, "r");
    char *buf = calloc(numberToRead, sizeof(char));
    size_t size = 1;
    while ((fread(buf, size, numberToRead, somFile) > 0)) {
        char *buf2 = calloc((numberToRead + 3), sizeof(char));
        strcat(buf2, lineNumber); strcat(buf2, " "); strcat(buf2, buf); strcat(buf2, "[=14=]");
        while (strstr(buf2, "\n")) {
            buf2[strcspn(buf2, "\n")] = ' ';
        }
        printf("SENDING: %s\n", buf2);
        fflush(stdout);
        write(fd, buf2, numberToRead + 3);
        sleep(2);
        free(buf);
        free(buf2);
        buf = calloc(numberToRead, sizeof(char));
    }
    write(fd, lineNumber, 2);
    close(fd);
    return 0;
}

在 运行 生产者和消费者之后,通信应该开始工作,一段时间后应该会有一个输出文件。在每次这样的执行之后你必须手动删除文件,因为我没有真的考虑它之前存在的情况。

示例开始(每行应该在不同的终端):

./producer '/tmp/fifo3' 3 'file1' 10
./producer '/tmp/fifo3' 2 'file1' 10
./producer '/tmp/fifo3' 1 'file1' 10
./testConsument '/tmp/fifo3' 'output' 10

有很多调试打印,我不确定它们是否有帮助,但我将它们留在里面。

线路问题

    write(fd, lineNumber, 2);

生产者计划接近尾声。它发送了毫无意义且未正确解释的不必要数据。

删除它后,程序按预期工作(除了意外的新行,但它们并没有那么糟糕,而且以前发生过)。

您面临的问题是,将多个生产者连接到共享资源(fifo),您需要控制访问的方式,以便能够控制消费者以正确的顺序获取数据。您从内核获得的唯一帮助是在 write(2) 系统调用级别(内核在执行系统调用期间锁定目标 fifo 的索引节点)因此,如果您进行短写,最简单的方法是将您要放入 fifo 的所有数据组合在一起,然后 write(2) 将它们全部放在一个写入调用中。

如果您选择更复杂的解决方案,那么您需要使用某种 mutex/semaphore/whatever 来控制谁可以独占访问 fifo 进行写入,因为其他进程必须等待它释放锁在开始写作之前。

此外,如果您正在使用这种方法,请不要尝试使用 stdiostdio 包仅在刷新缓冲区时写入数据,输出终端与 fifo 的情况不同,这取决于它使用的 actua 缓冲区大小,您对当它发生时。这意味着您不能使用 fprintf(3) 和好友。

最后,如果您使用 write(2) 的原子性,请记住 fifo 是一种有限的资源,它可以缓冲数据,如果您尝试这样做,将会中断 write(2) 调用一次写入大量数据(这意味着单个 write(2) 调用)您可以获得部分写入并且您将无法从中恢复,因为与此同时其他生产者可以访问 fifo并在上面写(这会破坏你的写作结构)根据经验,尝试将你的消息减少到少量千字节(4kb 或 8kb 是一个很好的上限,可以移植到不同的 unices)