从 fifo 读取时重叠输出:如何 fix/avoid 这个?
overlapping output while reading from fifo: how to fix/avoid this?
我正在尝试聚合来自 2 个文件的数据,因此我决定通过单独的编写器进程将数据发送到命名的 fifo,并启动一个单独的 reader 进程来读取和处理聚合数据。所有 reading/writing 都发生在 ramdisk (/dev/shm) 上,该磁盘非常方便,大约 100 GB。
这个工作文件,我确保写入 fifo 的每个数据行都小于 512 字节,因此管道可以保留其原子行为。
但在尝试多次运行后,我开始观察到 reader 进程正在接收重叠输出,当我尝试从每个进程传输超过 1000 万行时,这种情况就开始发生。我的每条数据线都以换行结束。
我正在以“+< fifo”模式打开 fifo 进行读取,以“>> fifo”进行写入。这里没有使用系统调用,只是使用正常打开来获取文件句柄并尝试逐行处理数据。
我怎样才能开始调查这个。有什么想法吗?
非常感谢。
2019 年更新/APR/29:
请注意,我的循环现在正在使用系统调用。以前我没有使用它们,但最终决定使用它们。
同样的事情也可以通过让 2 个进程写入一个文件来实现,但需要小心,因为这只适用于 POSIX 兼容的文件系统,或者如果没有 -可以将日志文件(多个进程将在其中执行写入)保存在 RAMDISK 中,因为它也可以工作。 NFS 驱动器不在范围内,因为它不符合 POSIX 并且此技术不适用于它。
因此,如果我们谈论 FIFO 与文本文件 - 文件的多个进程 reading/writing 比 FIFO 的多个进程 reading/writing 更快。
为了即将到来的 readers,这是我的作者和 reader 流程代码。如何设计代码以合并这些子例程取决于您。有很多方法可以做到。
希望有用。
写入进程
write_log => sub {
my ($filehandle, $log_message) = @_;
select $filehandle ; $|++;
syswrite ($filehandle, $log_message, length($log_message))
or die "write_log: syswrite fail!\n";
},
reader 进程:
read_log => sub
{
# In my endless reading loop,
# if I detect keyword END 2 times (as
# i have 2 processes), I exit the reading loop
# and do further operations.
#
my ($end_check_value) = @_;
sysopen (FH,$logfile, O_CREAT|O_RDONLY)
or die "($$) read_log: Failed to sysopen\n";
my ($h, $end) = (undef,0);
select FH ; $|++ ;
print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";
for (;;)
{
while (my $line = <FH>)
{
chomp $line;
$end++ if $line =~ m/END/g;
last if $end == $end_check_value;
my $key = (split(/\s/,$line))[0];
$h->{$key}++;
}
sleep(1) ; seek (FH,0,1);
# break out of for loop if we
# have collected the 'END' tags
# from all worker processes
if ($end == $end_check_value)
{
print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
"with end_check: $end_check_value\n";
last;
}
} close (FH);
},
性能统计:
这是多个进程写入 RAMDISK 上的单个文件的性能统计信息。平均而言,写入 150,000,000 行(1.5 亿)然后读入哈希大约需要 10 分钟加负 20 秒。
test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!
real **10m13.466s**
user 9m31.627s
sys 0m39.650s
这是 FIFO 的性能统计数据,其中多个进程分别向 FIFO 写入 25,000,000 行,然后 reader 进程将它们读回哈希。平均大约需要 25-30 分钟。它比写入文件的进程慢。
test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
s1 = 25000000
s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..
您可能需要为正在写入的文件打开自动刷新。如果您使用 open() 函数而不是通过像 IO::File 这样的 OO 接口打开文件,那么在您成功打开文件后(例如 $fifo),您需要这样的代码。
select $fifo;
$| = 1;
请注意,select() 为打印选择输出文件句柄等,不指定特定的文件句柄。如果你想恢复到以 STDOUT 为目标,那么在上面之后 select STDOUT
,或者,迂腐:
my $oldfh = select $fifo;
$| = 1;
select $oldfh;
我认为文件模式('+<' 等)与它没有任何关系,因为 "clobbering" 和 "appending" 等概念不适用于 FIFO。您可能会用简单的“>”和“<”做同样的事情。
您在这里看到的可能是并发的简单产物。您假设 reader 及时从 FIFO 中提取数据。如果两位作家都有机会在 reader 再次尝试阅读之前写几张唱片怎么办?如果 FIFO 在写操作中途达到容量怎么办? writer 将在写入过程中阻塞一部分,然后 reader 将有机会清空队列,但不能保证写入部分行的 writer 将是下一个写入者。这将导致交错行。
如果我关于自动刷新的回答没有解决您的问题,您可能不得不考虑以这种方式交错写入的可能性。
如上面的评论所述,您最好使用数据报套接字 (SOCK_DGRAM) 而不是 FIFO。这样,每条消息都是一个原子单元,没有交织的机会。
我正在尝试聚合来自 2 个文件的数据,因此我决定通过单独的编写器进程将数据发送到命名的 fifo,并启动一个单独的 reader 进程来读取和处理聚合数据。所有 reading/writing 都发生在 ramdisk (/dev/shm) 上,该磁盘非常方便,大约 100 GB。
这个工作文件,我确保写入 fifo 的每个数据行都小于 512 字节,因此管道可以保留其原子行为。
但在尝试多次运行后,我开始观察到 reader 进程正在接收重叠输出,当我尝试从每个进程传输超过 1000 万行时,这种情况就开始发生。我的每条数据线都以换行结束。
我正在以“+< fifo”模式打开 fifo 进行读取,以“>> fifo”进行写入。这里没有使用系统调用,只是使用正常打开来获取文件句柄并尝试逐行处理数据。
我怎样才能开始调查这个。有什么想法吗?
非常感谢。
2019 年更新/APR/29:
请注意,我的循环现在正在使用系统调用。以前我没有使用它们,但最终决定使用它们。
同样的事情也可以通过让 2 个进程写入一个文件来实现,但需要小心,因为这只适用于 POSIX 兼容的文件系统,或者如果没有 -可以将日志文件(多个进程将在其中执行写入)保存在 RAMDISK 中,因为它也可以工作。 NFS 驱动器不在范围内,因为它不符合 POSIX 并且此技术不适用于它。
因此,如果我们谈论 FIFO 与文本文件 - 文件的多个进程 reading/writing 比 FIFO 的多个进程 reading/writing 更快。
为了即将到来的 readers,这是我的作者和 reader 流程代码。如何设计代码以合并这些子例程取决于您。有很多方法可以做到。
希望有用。
写入进程
write_log => sub {
my ($filehandle, $log_message) = @_;
select $filehandle ; $|++;
syswrite ($filehandle, $log_message, length($log_message))
or die "write_log: syswrite fail!\n";
},
reader 进程:
read_log => sub
{
# In my endless reading loop,
# if I detect keyword END 2 times (as
# i have 2 processes), I exit the reading loop
# and do further operations.
#
my ($end_check_value) = @_;
sysopen (FH,$logfile, O_CREAT|O_RDONLY)
or die "($$) read_log: Failed to sysopen\n";
my ($h, $end) = (undef,0);
select FH ; $|++ ;
print STDOUT get_ts().'|'."($$) read_log: now tailing logfile with check count $end_check_value\n";
for (;;)
{
while (my $line = <FH>)
{
chomp $line;
$end++ if $line =~ m/END/g;
last if $end == $end_check_value;
my $key = (split(/\s/,$line))[0];
$h->{$key}++;
}
sleep(1) ; seek (FH,0,1);
# break out of for loop if we
# have collected the 'END' tags
# from all worker processes
if ($end == $end_check_value)
{
print STDOUT get_ts().'|'."($$) read_log: breaking for loop ",
"with end_check: $end_check_value\n";
last;
}
} close (FH);
},
性能统计:
这是多个进程写入 RAMDISK 上的单个文件的性能统计信息。平均而言,写入 150,000,000 行(1.5 亿)然后读入哈希大约需要 10 分钟加负 20 秒。
test string is 238 bytes long
20190429-12:34:50.637|(11139) PARENT: each child will write (75000000) to (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11139) trunc_log_file: truncated (/dev/shm/multi_proc_test_logfile.log)
20190429-12:34:54.399|(11149) process no. (2) launched!
20190429-12:34:54.399|(11150) process no. (1) launched!
20190429-12:34:55.400|(11139) read_log: now tailing logfile with check count 2
20190429-12:44:21.565|(11150) process exiting with status code 0
20190429-12:44:34.164|(11149) process exiting with status code 0
20190429-12:45:03.956|(11139) read_log: breaking for loop with end_check: 2
20190429-12:45:03.957|(11139) read_log: Collected counts:
(11139) (11149):75000000
(11139) (11150):75000000
---------------
(11139) Finished!
real **10m13.466s**
user 9m31.627s
sys 0m39.650s
这是 FIFO 的性能统计数据,其中多个进程分别向 FIFO 写入 25,000,000 行,然后 reader 进程将它们读回哈希。平均大约需要 25-30 分钟。它比写入文件的进程慢。
test string is 141 bytes long
20190426-10:25:13.455|28342|2-test-fifo.pl: Starting..
20190426-10:25:13.456|28345|CHILD starting (read_and_hash)
20190426-10:25:13.456|28345|READ_AND_HASH now hashing files
20190426-10:25:14.458|28346|CHILD starting (s1_data_gather)
20190426-10:25:14.458|28346|Working on sit1 data..
20190426-10:25:14.458|28347|CHILD starting (s2_data_gather)
20190426-10:25:14.458|28347|Working on sit2 data..
20190426-10:48:48.454|28346|Finished working on S1 data..
20190426-10:48:48.457|28342|Reaped 28346
20190426-10:48:48.462|28345|read LAST line from S2 data
20190426-10:48:52.657|28347|Finished working on s2 data..
20190426-10:48:52.660|28342|Reaped 28347
20190426-10:48:52.669|28345|read LAST line from S2 data
20190426-10:48:53.130|28345|READ_AND_HASH finished hashing files
(read_n_hash): finished hashing. keys count
s1 = 25000000
s2 = 25000000
20190426-10:48:53.130|28345|starting comparison. doing source to target
20190426-10:49:49.566|28345|finished comparing source to target. now comparing target to source
20190426-10:50:45.578|28345|comparing target to source ends. finished
20190426-10:51:57.220|28342|Reaped 28345
20190426-10:51:57.220|28342|2-test-fifo.pl: Ending..
您可能需要为正在写入的文件打开自动刷新。如果您使用 open() 函数而不是通过像 IO::File 这样的 OO 接口打开文件,那么在您成功打开文件后(例如 $fifo),您需要这样的代码。
select $fifo;
$| = 1;
请注意,select() 为打印选择输出文件句柄等,不指定特定的文件句柄。如果你想恢复到以 STDOUT 为目标,那么在上面之后 select STDOUT
,或者,迂腐:
my $oldfh = select $fifo;
$| = 1;
select $oldfh;
我认为文件模式('+<' 等)与它没有任何关系,因为 "clobbering" 和 "appending" 等概念不适用于 FIFO。您可能会用简单的“>”和“<”做同样的事情。
您在这里看到的可能是并发的简单产物。您假设 reader 及时从 FIFO 中提取数据。如果两位作家都有机会在 reader 再次尝试阅读之前写几张唱片怎么办?如果 FIFO 在写操作中途达到容量怎么办? writer 将在写入过程中阻塞一部分,然后 reader 将有机会清空队列,但不能保证写入部分行的 writer 将是下一个写入者。这将导致交错行。
如果我关于自动刷新的回答没有解决您的问题,您可能不得不考虑以这种方式交错写入的可能性。
如上面的评论所述,您最好使用数据报套接字 (SOCK_DGRAM) 而不是 FIFO。这样,每条消息都是一个原子单元,没有交织的机会。