使用 Parallel::ForkManager 处理文件
Using Parallel::ForkManager to process file
我想知道使用 Parallel::ForkManager
(或其他并行化工具)来处理我拥有的一些文件是否是个好主意。基本上,我正在处理一个非常大的文件,并将其内容输出到多个文件中。在 64 核服务器中,这通常需要大约 3 个小时。
我想知道的是这个模块的实现是如何收集数据的。例如,如果我这样做
use Parallel::ForkManager;
# Max 30 processes
my $pm = new Parallel::ForkManager(64);
open my $in,"<","D:\myfile.txt";
my @data=<$in>;
close $in;
#gathers unique dataheaders
my @uniqueheaders;
foreach my $line (@data){
my @split=split "\t",$line;
push @uniqueheaders,$split[0] unless (grep{$_=~/$split[0]} @uniqueheaders);
}
foreach my $head (@uniqueheaders) {
$pm->start and next; # do the fork
(my @matches) = grep{$_=~/^$head\t/} @data; #finds all matches in @data started by $head
if($#matches>1){ #prints out if matches are found
open my $out,">",'D:\directory\'."$head".'data';
print $out @matches;
close $out;
}
else{ print "Problem in $head!\n";}
$pm->finish; # do the exit in the child process
}
$pm->wait_all_children;
现在,我的问题是:
- 你觉得这样写脚本有什么问题吗?每个
$head
会一次分配给一个核心,还是我必须注意其他我不知道的事情?
- 如果我想把整个数据处理一次输出怎么办?例如,在最后一个
foreach
循环之前创建一个数组 @gatherstuff
,而不是 print
输出,它会 push @gatherstuff,@matches;
。这像我做的那么简单吗?
仅当您预处理文件以确定要分配给每个工作人员的范围时,对单个输入文件使用 Parallel::ForkManager 可能最终才有意义。而且,只有当您要使用相同的输入多次重复工作时,这才有意义。
即使您可能从使用 Parallel::ForkManager
中获得一些好处,让 30 个进程尝试执行 IO 也不会给您带来任何好处。如果系统没有做任何其他事情,我最推荐的是内核数量的两倍,假设你有很多内存。
操作系统的缓存可能会导致不同的进程在初始预热后实际从内存中读取文件,lead to gains from having multiple processes do the processing。
由于多种原因,写入不太可能从多个进程中获益。进程会遍历内存space,进程需要等待buffer刷新到磁盘等等,这样的话,IO瓶颈肯定会更加突出
在尝试并行编写代码 运行 之前,请尝试看看是否可以高效地将代码优化为 运行 串行。如果这种优化的好处还不够,那么你可以尝试使用Parallel::ForkManager
。您的代码存在的一些问题是:
- 整个文件被读入内存:一次读取如此大量的行会大大增加程序的内存使用量,但也会增加需要执行的时间。内存可能不是问题,但
@data
数组的重复重新分配会占用时间。如果 RAM 数量较少,您将需要大量交换到磁盘,这会耗费更多时间。
grep
用于 'contains' 检查而不是散列: grep
多次对如此大量的记录执行 ping 操作是非常慢而且根本不可扩展。截至目前,提取 headers 的过程具有 O(n^2)
的顺序,其中 n
是输入文件中的记录数。如果您使用散列,则顺序将为 O(n)
,这更易于管理。类似的论点适用于您提取匹配记录的方式。
- 'headers' 在开头提取: 这在您当前 运行 并行代码的方法中可能是必要的,但您可以尝试避免这种情况,因为它会遍历所有记录。
这是我解决问题的方法,无需并行编写代码 运行。您可能需要使用 ulimit -n
命令增加允许打开的文件描述符的数量。
use strict;
use warnings;
my ($input_file, $output_dir) = (@ARGV);
die "Syntax: [=10=] <input_file> <output_dir>"
unless $input_file and $output_dir;
open my $in, '<', $input_file
or die "Could not open input file $input_file: $!";
# map of ID (aka header) -> file handle
my %idfh;
while (my $line = <$in>) {
# extract the ID
$line =~ /^(.+?)\t/;
my $id = ;
# get the open file handle
my $fh = $idfh{$id};
unless ($fh) {
# if there was no file handle for this ID, open a new one
open $fh, '>', "$output_dir/${id}data"
or die "Could not open file for ID $id: $!";
$idfh{$id} = $fh;
}
# print the record to the correct file handle
print $fh $line;
}
# perl automatically closes all file handles
这很简单:
- 遍历文件的每一行。对于每次迭代,执行以下操作:
- 提取 ID。
- 如果我们之前没有看到ID,打开ID对应的文件进行写入。否则,转到步骤 4。
- 将文件句柄存储在以 ID 为键的映射中。
- 如果之前看到了 ID,则从哈希中获取文件句柄。
- 通过文件句柄写入记录。
我想知道使用 Parallel::ForkManager
(或其他并行化工具)来处理我拥有的一些文件是否是个好主意。基本上,我正在处理一个非常大的文件,并将其内容输出到多个文件中。在 64 核服务器中,这通常需要大约 3 个小时。
我想知道的是这个模块的实现是如何收集数据的。例如,如果我这样做
use Parallel::ForkManager;
# Max 30 processes
my $pm = new Parallel::ForkManager(64);
open my $in,"<","D:\myfile.txt";
my @data=<$in>;
close $in;
#gathers unique dataheaders
my @uniqueheaders;
foreach my $line (@data){
my @split=split "\t",$line;
push @uniqueheaders,$split[0] unless (grep{$_=~/$split[0]} @uniqueheaders);
}
foreach my $head (@uniqueheaders) {
$pm->start and next; # do the fork
(my @matches) = grep{$_=~/^$head\t/} @data; #finds all matches in @data started by $head
if($#matches>1){ #prints out if matches are found
open my $out,">",'D:\directory\'."$head".'data';
print $out @matches;
close $out;
}
else{ print "Problem in $head!\n";}
$pm->finish; # do the exit in the child process
}
$pm->wait_all_children;
现在,我的问题是:
- 你觉得这样写脚本有什么问题吗?每个
$head
会一次分配给一个核心,还是我必须注意其他我不知道的事情? - 如果我想把整个数据处理一次输出怎么办?例如,在最后一个
foreach
循环之前创建一个数组@gatherstuff
,而不是print
输出,它会push @gatherstuff,@matches;
。这像我做的那么简单吗?
仅当您预处理文件以确定要分配给每个工作人员的范围时,对单个输入文件使用 Parallel::ForkManager 可能最终才有意义。而且,只有当您要使用相同的输入多次重复工作时,这才有意义。
即使您可能从使用 Parallel::ForkManager
中获得一些好处,让 30 个进程尝试执行 IO 也不会给您带来任何好处。如果系统没有做任何其他事情,我最推荐的是内核数量的两倍,假设你有很多内存。
操作系统的缓存可能会导致不同的进程在初始预热后实际从内存中读取文件,lead to gains from having multiple processes do the processing。
由于多种原因,写入不太可能从多个进程中获益。进程会遍历内存space,进程需要等待buffer刷新到磁盘等等,这样的话,IO瓶颈肯定会更加突出
在尝试并行编写代码 运行 之前,请尝试看看是否可以高效地将代码优化为 运行 串行。如果这种优化的好处还不够,那么你可以尝试使用Parallel::ForkManager
。您的代码存在的一些问题是:
- 整个文件被读入内存:一次读取如此大量的行会大大增加程序的内存使用量,但也会增加需要执行的时间。内存可能不是问题,但
@data
数组的重复重新分配会占用时间。如果 RAM 数量较少,您将需要大量交换到磁盘,这会耗费更多时间。 grep
用于 'contains' 检查而不是散列:grep
多次对如此大量的记录执行 ping 操作是非常慢而且根本不可扩展。截至目前,提取 headers 的过程具有O(n^2)
的顺序,其中n
是输入文件中的记录数。如果您使用散列,则顺序将为O(n)
,这更易于管理。类似的论点适用于您提取匹配记录的方式。- 'headers' 在开头提取: 这在您当前 运行 并行代码的方法中可能是必要的,但您可以尝试避免这种情况,因为它会遍历所有记录。
这是我解决问题的方法,无需并行编写代码 运行。您可能需要使用 ulimit -n
命令增加允许打开的文件描述符的数量。
use strict;
use warnings;
my ($input_file, $output_dir) = (@ARGV);
die "Syntax: [=10=] <input_file> <output_dir>"
unless $input_file and $output_dir;
open my $in, '<', $input_file
or die "Could not open input file $input_file: $!";
# map of ID (aka header) -> file handle
my %idfh;
while (my $line = <$in>) {
# extract the ID
$line =~ /^(.+?)\t/;
my $id = ;
# get the open file handle
my $fh = $idfh{$id};
unless ($fh) {
# if there was no file handle for this ID, open a new one
open $fh, '>', "$output_dir/${id}data"
or die "Could not open file for ID $id: $!";
$idfh{$id} = $fh;
}
# print the record to the correct file handle
print $fh $line;
}
# perl automatically closes all file handles
这很简单:
- 遍历文件的每一行。对于每次迭代,执行以下操作:
- 提取 ID。
- 如果我们之前没有看到ID,打开ID对应的文件进行写入。否则,转到步骤 4。
- 将文件句柄存储在以 ID 为键的映射中。
- 如果之前看到了 ID,则从哈希中获取文件句柄。
- 通过文件句柄写入记录。