当一个进程一次写入 FIFO 时,使用命名管道的 C Linux 程序按预期工作,但在更多实例写入时中断
C Linux program using named pipes works as expected when one process at a time writes into the FIFO, but breaks when more instances do it
我的程序有一个消费者和多个生产者。每个生产者读取一个不同的文件,并将其内容以 N 大小的块写入 FIFO,并带有供消费者解释的前导参数。
消费者应该获取这些块并组成一个输出文件,其中每一行对应一个生产者。来自块的前导参数用于确定块的所有者以及将其写入的位置(它是输出文件中的行号)。
我的问题是,即使在只有一个制作人的情况下它基本上可以正常工作,但如果再多的话,生成的文件就会变得一团糟。还有一些意想不到的过度\n
,但它们并不重要。
这是我的预期输出:
aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b bbbbbbb4b bbbbbbbbbb5b bb6b
cccccccccc1c cccc2c cccccccc3c ccccc4c ccccccccc5c ccccccccccccc6c
但这就是我得到的:
aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a2 bbbbbbb43 cccccccc53 cccccccc2 bbbbbbbb2 b5b bb6b3 cccc6c2
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b
cccccccccc1c cccc2c cccccccc3c ccccc4c c
后几行意外中断,块变得混乱。
我认为我处理命名管道的方式存在问题,因为我在进一步处理之前打印“原始输入”并且我可以看到我正在从管道读取无效数据。但是 AFAIK Linux 对 FIFO 的小块数据有原子写入。也许读取不关心写入,这就是问题所在?
消费者代码:
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
char *filename;
size_t getFileSize(FILE *fp) {
fseek(fp, 0, SEEK_END);
size_t len = ftell(fp);
rewind(fp);
printf("length %ld \n", len);
return len;
}
int nthFunctionCall = 1;
void printFile(FILE *file) {
char *fileContent = NULL;
if(file != NULL) {
size_t size = getFileSize(file);
fileContent = malloc((size /* + 1*/) * sizeof(char));
fread(fileContent, 1, size, file);
//fileContent[size + 1] = '[=13=]'; ?
}
printf("FILE CONTENT: \n%s\n", fileContent);
}
void writeToFile(long targetLineNumber, char *text) {
FILE *temp = fopen("temp", "w");
if(temp == NULL) {
perror("can't create temp");
exit(-1);
}
char *fileContents = NULL;
FILE *file = fopen(filename, "r");
if(file != NULL) {
size_t size = getFileSize(file);
fileContents = malloc((size + 1) * sizeof(char));
fread(fileContents, 1, size, file);
fileContents[size] = '[=13=]'; // tbh, I don't know whether I should do this or not.
fclose(file);
}
char *fileContentsCpy = fileContents;
printf("FILE CONTENT:\n %s\n", fileContents);
printf("%d Text to save %s\n", nthFunctionCall, text);
char *currentLineFromFile;
size_t processedLineNumber;
for (processedLineNumber = 1; (currentLineFromFile = strsep(&fileContents, "\n")) != NULL; processedLineNumber++) {
printf("%d targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);
printf("%d copy the current line into temp: %s\n", nthFunctionCall, currentLineFromFile);
fputs(currentLineFromFile, temp);
if(processedLineNumber == targetLineNumber) {
printf("%d add text to line %ld: %s\n", nthFunctionCall, processedLineNumber, text);
fputs(text, temp);
}
fputs("\n", temp);
fflush(temp);
}
printf("%d Finished loop with: targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);
if(targetLineNumber >= processedLineNumber) {
for (int j = 0; j < (targetLineNumber - processedLineNumber); ++j) {
fputs("\n", temp);
}
printf("%d added text: %s\n", nthFunctionCall, text);
fputs(text, temp);
fflush(temp);
}
fclose(temp);
if(fileContentsCpy != NULL) free(fileContentsCpy);
nthFunctionCall++;
remove(filename);
rename("temp", filename);
printf("One iteration end\n");
}
int numberLength(size_t number) {
int len = 0;
while(number > 0) {
number /= 10;
len++;
}
return len;
}
int main(int argc, char **argv)
{
if (argc < 4) {
fprintf(stderr, "testConsument <fifo_path> <file_to_save_in> <chunk size>\n");
exit(-1);
}
char *myfifo = argv[1];
filename = argv[2];
int numberToRead = atoi(argv[3]);
int fd = open(myfifo, O_RDONLY);
perror("sdada test consument");
char *str1 = calloc(100, sizeof(char));
while (read(fd, str1, numberToRead + 3) > 0) {
long lineNumber;
printf("length: %ld raw input: %s\n", strlen(str1), str1);
sscanf(str1, "%ld", &lineNumber);
char* content = str1 + numberLength(lineNumber) + 1; // lines should be of the format "<number> <chunk-sized-word>[=13=]"
printf("add to line %ld content : %s \n", lineNumber, content);
writeToFile(lineNumber, content);
sleep(1);
free(str1);
str1 = calloc(100, sizeof(char));
printf("#################\n");
}
free(str1);
close(fd);
FILE *res = fopen(filename, "r");
printFile(res);
fclose(res);
return 0;
}
生产者代码:
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
size_t getFileSize(FILE *fp) {
fseek(fp, 0, SEEK_END);
size_t len = ftell(fp);
rewind(fp);
return len;
}
int main(int argc, char **argv) {
if (argc < 5) {
fprintf(stderr, "producer <fifo_path> <line_number_to_save_in> <input_file> <chunk_size>\n");
exit(-1);
}
char *myfifo = argv[1];
char *lineNumber = argv[2];
int numberToRead = atoi(argv[4]);
mkfifo(myfifo, 0666);
int fd = open(myfifo, O_WRONLY);
char *someFilePath = argv[3];
FILE *somFile = fopen(someFilePath, "r");
char *buf = calloc(numberToRead, sizeof(char));
size_t size = 1;
while ((fread(buf, size, numberToRead, somFile) > 0)) {
char *buf2 = calloc((numberToRead + 3), sizeof(char));
strcat(buf2, lineNumber); strcat(buf2, " "); strcat(buf2, buf); strcat(buf2, "[=14=]");
while (strstr(buf2, "\n")) {
buf2[strcspn(buf2, "\n")] = ' ';
}
printf("SENDING: %s\n", buf2);
fflush(stdout);
write(fd, buf2, numberToRead + 3);
sleep(2);
free(buf);
free(buf2);
buf = calloc(numberToRead, sizeof(char));
}
write(fd, lineNumber, 2);
close(fd);
return 0;
}
在 运行 生产者和消费者之后,通信应该开始工作,一段时间后应该会有一个输出文件。在每次这样的执行之后你必须手动删除文件,因为我没有真的考虑它之前存在的情况。
示例开始(每行应该在不同的终端):
./producer '/tmp/fifo3' 3 'file1' 10
./producer '/tmp/fifo3' 2 'file1' 10
./producer '/tmp/fifo3' 1 'file1' 10
./testConsument '/tmp/fifo3' 'output' 10
有很多调试打印,我不确定它们是否有帮助,但我将它们留在里面。
线路问题
write(fd, lineNumber, 2);
生产者计划接近尾声。它发送了毫无意义且未正确解释的不必要数据。
删除它后,程序按预期工作(除了意外的新行,但它们并没有那么糟糕,而且以前发生过)。
您面临的问题是,将多个生产者连接到共享资源(fifo),您需要控制访问的方式,以便能够控制消费者以正确的顺序获取数据。您从内核获得的唯一帮助是在 write(2)
系统调用级别(内核在执行系统调用期间锁定目标 fifo 的索引节点)因此,如果您进行短写,最简单的方法是将您要放入 fifo 的所有数据组合在一起,然后 write(2)
将它们全部放在一个写入调用中。
如果您选择更复杂的解决方案,那么您需要使用某种 mutex/semaphore/whatever 来控制谁可以独占访问 fifo 进行写入,因为其他进程必须等待它释放锁在开始写作之前。
此外,如果您正在使用这种方法,请不要尝试使用 stdio
。 stdio
包仅在刷新缓冲区时写入数据,输出终端与 fifo 的情况不同,这取决于它使用的 actua 缓冲区大小,您对当它发生时。这意味着您不能使用 fprintf(3)
和好友。
最后,如果您使用 write(2)
的原子性,请记住 fifo 是一种有限的资源,它可以缓冲数据,如果您尝试这样做,将会中断 write(2)
调用一次写入大量数据(这意味着单个 write(2)
调用)您可以获得部分写入并且您将无法从中恢复,因为与此同时其他生产者可以访问 fifo并在上面写(这会破坏你的写作结构)根据经验,尝试将你的消息减少到少量千字节(4kb 或 8kb 是一个很好的上限,可以移植到不同的 unices)
我的程序有一个消费者和多个生产者。每个生产者读取一个不同的文件,并将其内容以 N 大小的块写入 FIFO,并带有供消费者解释的前导参数。
消费者应该获取这些块并组成一个输出文件,其中每一行对应一个生产者。来自块的前导参数用于确定块的所有者以及将其写入的位置(它是输出文件中的行号)。
我的问题是,即使在只有一个制作人的情况下它基本上可以正常工作,但如果再多的话,生成的文件就会变得一团糟。还有一些意想不到的过度\n
,但它们并不重要。
这是我的预期输出:
aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b bbbbbbb4b bbbbbbbbbb5b bb6b
cccccccccc1c cccc2c cccccccc3c ccccc4c ccccccccc5c ccccccccccccc6c
但这就是我得到的:
aaaaa1a aaaaaaa2a aaa3a aaaaaaaaaaa4a2 bbbbbbb43 cccccccc53 cccccccc2 bbbbbbbb2 b5b bb6b3 cccc6c2
bbbbbbbbbbb1b bbbbbbb2b bbbbbbbbbbbbbb3b
cccccccccc1c cccc2c cccccccc3c ccccc4c c
后几行意外中断,块变得混乱。
我认为我处理命名管道的方式存在问题,因为我在进一步处理之前打印“原始输入”并且我可以看到我正在从管道读取无效数据。但是 AFAIK Linux 对 FIFO 的小块数据有原子写入。也许读取不关心写入,这就是问题所在?
消费者代码:
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdlib.h>
char *filename;
size_t getFileSize(FILE *fp) {
fseek(fp, 0, SEEK_END);
size_t len = ftell(fp);
rewind(fp);
printf("length %ld \n", len);
return len;
}
int nthFunctionCall = 1;
void printFile(FILE *file) {
char *fileContent = NULL;
if(file != NULL) {
size_t size = getFileSize(file);
fileContent = malloc((size /* + 1*/) * sizeof(char));
fread(fileContent, 1, size, file);
//fileContent[size + 1] = '[=13=]'; ?
}
printf("FILE CONTENT: \n%s\n", fileContent);
}
void writeToFile(long targetLineNumber, char *text) {
FILE *temp = fopen("temp", "w");
if(temp == NULL) {
perror("can't create temp");
exit(-1);
}
char *fileContents = NULL;
FILE *file = fopen(filename, "r");
if(file != NULL) {
size_t size = getFileSize(file);
fileContents = malloc((size + 1) * sizeof(char));
fread(fileContents, 1, size, file);
fileContents[size] = '[=13=]'; // tbh, I don't know whether I should do this or not.
fclose(file);
}
char *fileContentsCpy = fileContents;
printf("FILE CONTENT:\n %s\n", fileContents);
printf("%d Text to save %s\n", nthFunctionCall, text);
char *currentLineFromFile;
size_t processedLineNumber;
for (processedLineNumber = 1; (currentLineFromFile = strsep(&fileContents, "\n")) != NULL; processedLineNumber++) {
printf("%d targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);
printf("%d copy the current line into temp: %s\n", nthFunctionCall, currentLineFromFile);
fputs(currentLineFromFile, temp);
if(processedLineNumber == targetLineNumber) {
printf("%d add text to line %ld: %s\n", nthFunctionCall, processedLineNumber, text);
fputs(text, temp);
}
fputs("\n", temp);
fflush(temp);
}
printf("%d Finished loop with: targetLineNumber %ld processedLineNumber %ld \n", nthFunctionCall, targetLineNumber, processedLineNumber);
if(targetLineNumber >= processedLineNumber) {
for (int j = 0; j < (targetLineNumber - processedLineNumber); ++j) {
fputs("\n", temp);
}
printf("%d added text: %s\n", nthFunctionCall, text);
fputs(text, temp);
fflush(temp);
}
fclose(temp);
if(fileContentsCpy != NULL) free(fileContentsCpy);
nthFunctionCall++;
remove(filename);
rename("temp", filename);
printf("One iteration end\n");
}
int numberLength(size_t number) {
int len = 0;
while(number > 0) {
number /= 10;
len++;
}
return len;
}
int main(int argc, char **argv)
{
if (argc < 4) {
fprintf(stderr, "testConsument <fifo_path> <file_to_save_in> <chunk size>\n");
exit(-1);
}
char *myfifo = argv[1];
filename = argv[2];
int numberToRead = atoi(argv[3]);
int fd = open(myfifo, O_RDONLY);
perror("sdada test consument");
char *str1 = calloc(100, sizeof(char));
while (read(fd, str1, numberToRead + 3) > 0) {
long lineNumber;
printf("length: %ld raw input: %s\n", strlen(str1), str1);
sscanf(str1, "%ld", &lineNumber);
char* content = str1 + numberLength(lineNumber) + 1; // lines should be of the format "<number> <chunk-sized-word>[=13=]"
printf("add to line %ld content : %s \n", lineNumber, content);
writeToFile(lineNumber, content);
sleep(1);
free(str1);
str1 = calloc(100, sizeof(char));
printf("#################\n");
}
free(str1);
close(fd);
FILE *res = fopen(filename, "r");
printFile(res);
fclose(res);
return 0;
}
生产者代码:
#include <stdio.h>
#include <string.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#include <stdlib.h>
size_t getFileSize(FILE *fp) {
fseek(fp, 0, SEEK_END);
size_t len = ftell(fp);
rewind(fp);
return len;
}
int main(int argc, char **argv) {
if (argc < 5) {
fprintf(stderr, "producer <fifo_path> <line_number_to_save_in> <input_file> <chunk_size>\n");
exit(-1);
}
char *myfifo = argv[1];
char *lineNumber = argv[2];
int numberToRead = atoi(argv[4]);
mkfifo(myfifo, 0666);
int fd = open(myfifo, O_WRONLY);
char *someFilePath = argv[3];
FILE *somFile = fopen(someFilePath, "r");
char *buf = calloc(numberToRead, sizeof(char));
size_t size = 1;
while ((fread(buf, size, numberToRead, somFile) > 0)) {
char *buf2 = calloc((numberToRead + 3), sizeof(char));
strcat(buf2, lineNumber); strcat(buf2, " "); strcat(buf2, buf); strcat(buf2, "[=14=]");
while (strstr(buf2, "\n")) {
buf2[strcspn(buf2, "\n")] = ' ';
}
printf("SENDING: %s\n", buf2);
fflush(stdout);
write(fd, buf2, numberToRead + 3);
sleep(2);
free(buf);
free(buf2);
buf = calloc(numberToRead, sizeof(char));
}
write(fd, lineNumber, 2);
close(fd);
return 0;
}
在 运行 生产者和消费者之后,通信应该开始工作,一段时间后应该会有一个输出文件。在每次这样的执行之后你必须手动删除文件,因为我没有真的考虑它之前存在的情况。
示例开始(每行应该在不同的终端):
./producer '/tmp/fifo3' 3 'file1' 10
./producer '/tmp/fifo3' 2 'file1' 10
./producer '/tmp/fifo3' 1 'file1' 10
./testConsument '/tmp/fifo3' 'output' 10
有很多调试打印,我不确定它们是否有帮助,但我将它们留在里面。
线路问题
write(fd, lineNumber, 2);
生产者计划接近尾声。它发送了毫无意义且未正确解释的不必要数据。
删除它后,程序按预期工作(除了意外的新行,但它们并没有那么糟糕,而且以前发生过)。
您面临的问题是,将多个生产者连接到共享资源(fifo),您需要控制访问的方式,以便能够控制消费者以正确的顺序获取数据。您从内核获得的唯一帮助是在 write(2)
系统调用级别(内核在执行系统调用期间锁定目标 fifo 的索引节点)因此,如果您进行短写,最简单的方法是将您要放入 fifo 的所有数据组合在一起,然后 write(2)
将它们全部放在一个写入调用中。
如果您选择更复杂的解决方案,那么您需要使用某种 mutex/semaphore/whatever 来控制谁可以独占访问 fifo 进行写入,因为其他进程必须等待它释放锁在开始写作之前。
此外,如果您正在使用这种方法,请不要尝试使用 stdio
。 stdio
包仅在刷新缓冲区时写入数据,输出终端与 fifo 的情况不同,这取决于它使用的 actua 缓冲区大小,您对当它发生时。这意味着您不能使用 fprintf(3)
和好友。
最后,如果您使用 write(2)
的原子性,请记住 fifo 是一种有限的资源,它可以缓冲数据,如果您尝试这样做,将会中断 write(2)
调用一次写入大量数据(这意味着单个 write(2)
调用)您可以获得部分写入并且您将无法从中恢复,因为与此同时其他生产者可以访问 fifo并在上面写(这会破坏你的写作结构)根据经验,尝试将你的消息减少到少量千字节(4kb 或 8kb 是一个很好的上限,可以移植到不同的 unices)