向工人发送数据
Sending data to workers
我正在尝试创建一段并行代码来加速处理一个非常大的(几亿行)数组。为了并行化,我将数据分成 8 份(我的核心数),并尝试向每个工作人员发送 1 份。然而,看看我的 RAM 使用情况,似乎每件作品都发送给了每个工作人员,有效地将我的 RAM 使用量乘以 8。最小工作示例:
A = 1:16;
for ii = 1:8
data{ii} = A(2*ii-1:2*ii);
end
现在,当我使用 parfor
将此数据发送给工作人员时,它似乎发送了完整的单元格,而不仅仅是所需的部分:
output = cell(1,8);
parfor ii = 1:8
output{ii} = data{ii};
end
我实际上在 parfor
循环中使用了一些函数,但这说明了这种情况。 MATLAB 是否真的将完整的单元 data
发送给每个工作人员,如果是这样,如何让它只发送所需的部分?
我建议使用 MATLAB 的 spmd
命令。
您几乎可以像非并行实现一样编写代码,并且还可以通过 labindex
"system" 变量访问当前工作程序。
看这里:
http://www.mathworks.com/help/distcomp/spmd.html
还有关于 spmd
与 parfor
的问题:
SPMD vs. Parfor
@m.s 的评论是正确的 - 当 parfor
切片 一个数组时,每个工作人员只发送循环迭代所需的切片它正在努力。但是,您很可能会看到 RAM 使用量的增加超出了您最初的预期,因为不幸的是,在通过 parfor
通信机制将数据从客户端传递给工作人员时需要数据副本。
如果您只需要有关工作人员的数据,那么最好的解决方案是 create/load/access 如果可能,只需要有关工作人员的数据。听起来您追求的是数据并行性而不是任务并行性,spmd
确实更适合(正如@Kostas 建议的那样)。
根据我的个人经验,我发现使用 parfeval
在内存使用方面比 parfor
更好。此外,您的问题似乎更容易破解,因此您可以使用 parfeval
向 MATLAB worker 提交更多较小的作业。
假设您有 workerCnt
个 MATLAB worker,您将处理 jobCnt
个作业。设 data
是一个大小为 jobCnt x 1
的元胞数组,它的每个元素对应于函数 getOutput
的一个数据输入,该函数对数据进行分析。然后将结果存储在大小为 jobCnt x 1
.
的元胞数组 output
中
在以下代码中,作业在第一个 for
循环中分配,结果在第二个 while
循环中检索。布尔变量 doneJobs
指示完成了哪个作业。
poolObj = parpool(workerCnt);
jobCnt = length(data); % number of jobs
output = cell(jobCnt,1);
for jobNo = 1:jobCnt
future(jobNo) = parfeval(poolObj,@getOutput,...
nargout('getOutput'),data{jobNo});
end
doneJobs = false(jobCnt,1);
while ~all(doneJobs)
[idx,result] = fetchnext(future);
output{idx} = result;
doneJobs(idx) = true;
end
此外,如果您想节省更多内存,可以将此方法更进一步。你可以做的是,在获取完成的作业的结果后,你可以删除 future
的相应成员。原因是这个对象存储了 getOutput
函数的所有输入和输出数据,这可能会很大。但是您需要小心,因为删除 future
的成员会导致索引偏移。
下面是我为这只海豚写的代码。
poolObj = parpool(workerCnt);
jobCnt = length(data); % number of jobs
output = cell(jobCnt,1);
for jobNo = 1:jobCnt
future(jobNo) = parfeval(poolObj,@getOutput,...
nargout('getOutput'),data{jobNo});
end
doneJobs = false(jobCnt,1);
while ~all(doneJobs)
[idx,result] = fetchnext(future);
furure(idx) = []; % remove the done future object
oldIdx = 0;
% find the index offset and correct index accordingly
while oldIdx ~= idx
doneJobsInIdxRange = sum(doneJobs((oldIdx + 1):idx));
oldIdx = idx
idx = idx + doneJobsInIdxRange;
end
output{idx} = result;
doneJobs(idx) = true;
end
我正在尝试创建一段并行代码来加速处理一个非常大的(几亿行)数组。为了并行化,我将数据分成 8 份(我的核心数),并尝试向每个工作人员发送 1 份。然而,看看我的 RAM 使用情况,似乎每件作品都发送给了每个工作人员,有效地将我的 RAM 使用量乘以 8。最小工作示例:
A = 1:16;
for ii = 1:8
data{ii} = A(2*ii-1:2*ii);
end
现在,当我使用 parfor
将此数据发送给工作人员时,它似乎发送了完整的单元格,而不仅仅是所需的部分:
output = cell(1,8);
parfor ii = 1:8
output{ii} = data{ii};
end
我实际上在 parfor
循环中使用了一些函数,但这说明了这种情况。 MATLAB 是否真的将完整的单元 data
发送给每个工作人员,如果是这样,如何让它只发送所需的部分?
我建议使用 MATLAB 的 spmd
命令。
您几乎可以像非并行实现一样编写代码,并且还可以通过 labindex
"system" 变量访问当前工作程序。
看这里:
http://www.mathworks.com/help/distcomp/spmd.html
还有关于 spmd
与 parfor
的问题:
SPMD vs. Parfor
@m.s 的评论是正确的 - 当 parfor
切片 一个数组时,每个工作人员只发送循环迭代所需的切片它正在努力。但是,您很可能会看到 RAM 使用量的增加超出了您最初的预期,因为不幸的是,在通过 parfor
通信机制将数据从客户端传递给工作人员时需要数据副本。
如果您只需要有关工作人员的数据,那么最好的解决方案是 create/load/access 如果可能,只需要有关工作人员的数据。听起来您追求的是数据并行性而不是任务并行性,spmd
确实更适合(正如@Kostas 建议的那样)。
根据我的个人经验,我发现使用 parfeval
在内存使用方面比 parfor
更好。此外,您的问题似乎更容易破解,因此您可以使用 parfeval
向 MATLAB worker 提交更多较小的作业。
假设您有 workerCnt
个 MATLAB worker,您将处理 jobCnt
个作业。设 data
是一个大小为 jobCnt x 1
的元胞数组,它的每个元素对应于函数 getOutput
的一个数据输入,该函数对数据进行分析。然后将结果存储在大小为 jobCnt x 1
.
output
中
在以下代码中,作业在第一个 for
循环中分配,结果在第二个 while
循环中检索。布尔变量 doneJobs
指示完成了哪个作业。
poolObj = parpool(workerCnt);
jobCnt = length(data); % number of jobs
output = cell(jobCnt,1);
for jobNo = 1:jobCnt
future(jobNo) = parfeval(poolObj,@getOutput,...
nargout('getOutput'),data{jobNo});
end
doneJobs = false(jobCnt,1);
while ~all(doneJobs)
[idx,result] = fetchnext(future);
output{idx} = result;
doneJobs(idx) = true;
end
此外,如果您想节省更多内存,可以将此方法更进一步。你可以做的是,在获取完成的作业的结果后,你可以删除 future
的相应成员。原因是这个对象存储了 getOutput
函数的所有输入和输出数据,这可能会很大。但是您需要小心,因为删除 future
的成员会导致索引偏移。
下面是我为这只海豚写的代码。
poolObj = parpool(workerCnt);
jobCnt = length(data); % number of jobs
output = cell(jobCnt,1);
for jobNo = 1:jobCnt
future(jobNo) = parfeval(poolObj,@getOutput,...
nargout('getOutput'),data{jobNo});
end
doneJobs = false(jobCnt,1);
while ~all(doneJobs)
[idx,result] = fetchnext(future);
furure(idx) = []; % remove the done future object
oldIdx = 0;
% find the index offset and correct index accordingly
while oldIdx ~= idx
doneJobsInIdxRange = sum(doneJobs((oldIdx + 1):idx));
oldIdx = idx
idx = idx + doneJobsInIdxRange;
end
output{idx} = result;
doneJobs(idx) = true;
end