Julia:将 pmap 与 Arrays 和 SharedArrays 结合使用
Julia: use of pmap with Arrays vs SharedArrays
我已经在 Julia 工作了几个月,我有兴趣并行编写我的一些代码。我正在解决一个问题,我使用 1 个模型为几个不同的接收器生成数据(每个接收器的数据是一个向量)。每个接收器的数据都可以独立计算,这让我相信我应该能够使用 pmap 函数。我的计划是将数据初始化为 2D SharedArray(每列代表 1 个接收器的数据),然后让 pmap 循环遍历每一列。但是我发现将 SharedArray 与 pmap 一起使用并不比使用 map 串行工作快。我写了下面的虚拟代码来说明这一点。
@everywhere function Dummy(icol,model,data,A,B)
nx = 250
nz = 250
nh = 50
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
function main()
nx = 250
nz = 250
nh = 50
nt = 500
ncol = 100
model1 = rand(nz,nx,nh)
model2 = copy(model1)
model3 = convert(SharedArray,model1)
data1 = zeros(Float64,nt,ncol)
data2 = SharedArray(Float64,nt,ncol)
data3 = SharedArray(Float64,nt,ncol)
A1 = rand(nz,nx,nh)
A2 = copy(A1)
A3 = convert(SharedArray,A1)
B1 = rand(nz,nx,nh)
B2 = copy(B1)
B3 = convert(SharedArray,B1)
@time map((arg)->Dummy(arg,model1,data1,A1,B1),[icol for icol = 1:ncol])
@time pmap((arg)->Dummy(arg,model2,data2,A2,B2),[icol for icol = 1:ncol])
@time pmap((arg)->Dummy(arg,model3,data3,A3,B3),[icol for icol = 1:ncol])
println(data1==data2)
println(data1==data3)
end
main()
我使用 Julia -p 3
和 运行 脚本启动 Julia 会话。 3次测试的时间分别为1.4s、4.7s和1.6s。与带有 map (1.4s) 的常规数组相比,将 SharedArrays 与 pmap (1.6s 运行time) 一起使用并没有提供任何速度上的改进。我也很困惑为什么第二种情况(数据作为 SharedArray,所有其他输入作为带有 pmap 的常规数组)如此缓慢。我需要更改什么才能从并行工作中获益?
前言:是的,您的问题确实有解决方案。请参阅底部的代码。但是,在我到达那里之前,我会进行一些解释。
我认为问题的根源在于内存访问。首先,虽然我没有严格调查它,但我怀疑可以对 Julia 的底层代码进行一定数量的改进,以改进它在并行处理中处理内存访问的方式。尽管如此,在这种情况下,我 怀疑 基本代码的任何潜在问题,如果确实存在的话,并不是什么大错。相反,我认为仔细考虑您的代码中到底发生了什么以及它对内存访问意味着什么是有用的。
在 Julia 中工作时要牢记的一个关键事项是它以列优先顺序存储数组。也就是说,它将它们存储为彼此堆叠的列堆栈。这也推广到 > 2 的维度。有关详细信息,请参阅 this 非常有用的 Julia 性能提示部分。这意味着在单个列中一行接一行地访问是很快的。但是,如果您需要在列之间跳来跳去,那么您就会遇到麻烦。是的,访问 ram 内存可能相对较快,但访问缓存内存要快得多,因此如果您的代码允许将单个列左右从 ram 加载到缓存中然后继续处理,那么您将做很多事情比需要在 ram 和缓存之间进行大量交换要好。在您的代码中,您在计算之间从一列切换到另一列,这与任何人无关。例如,在您的 pmap
中,每个进程都会获得共享数组的不同列来处理。然后,每个都沿着该列的行向下移动并修改其中的值。但是,由于它们试图彼此并行工作,并且整个阵列太大而无法放入您的缓存中,因此 ram 和缓存之间会发生大量交换,这确实会减慢您的速度。从理论上讲,也许可以设计一个足够聪明的底层内存管理系统来解决这个问题,但我真的不知道——这超出了我的薪水等级。当然,同样的事情也发生在您对其他对象的访问上。
并行化时通常要记住的另一件事是触发器(即计算机计算)与 read/write 操作的比率。触发器倾向于很好地并行化,你可以有不同的内核、进程等,对它们保存在它们的小缓存中的自己的数据位进行自己的小计算。但是,read/write 操作不能很好地并行化。可以做一些事情来设计硬件系统来改进这一点。但总的来说,如果你有一个给定的计算机系统,比如说,两个内核,然后再添加四个内核,你执行失败的能力将增加三倍,但你 read/write 数据的能力 to/from ram不会真的提升那么多。 (注意:这是过于简单化,很大程度上取决于您的系统)。然而,一般来说,触发器与 read/write 的比率越高,并行性带来的好处就越多。在你的情况下,你的代码涉及相当数量的 read/writes (所有这些访问你的不同数组)对于相对较少数量的触发器(一些乘法和加法)。这只是要记住的事情。
幸运的是,如果编写正确,您的案例可以通过并行性得到一些很好的加速。根据我使用 Julia 的经验,我所有最成功的并行性都来自于我可以分解数据并让工作人员分别处理数据块。您的情况恰好适合于此。下面是我编写的一些代码的示例。您可以看到,从一个处理器到三个处理器,速度几乎提高了 3 倍。该代码在某些地方有点粗糙,但它至少展示了如何处理此类问题的一般思路。之后我对代码给出一些评论。
addprocs(3)
nx = 250;
nz = 250;
nh = 50;
nt = 250;
@everywhere ncol = 100;
model = rand(nz,nx,nh);
data = SharedArray(Float64,nt,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);
function distribute_data(X, obj_name_on_worker::Symbol, dim)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
println(StartIdx:EndIdx)
if dim == 3
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
elseif dim == 2
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
end
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(model, :model, 3)
distribute_data(A, :A, 3)
distribute_data(B, :B, 3)
distribute_data(data, :data, 2)
@everywhere function Dummy(icol,model,data,A,B)
nx = size(model, 2)
nz = size(A,1)
nh = size(model, 3)
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
regular_test() = map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
function parallel_test()
@everywhere begin
if myid() != 1
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
end
end
@time regular_test(); # 2.120631 seconds (307 allocations: 11.313 KB)
@time parallel_test(); # 0.918850 seconds (5.70 k allocations: 337.250 KB)
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
function recombine_data(Data::Symbol)
Results = cell(nworkers())
for (idx, pid) in enumerate(workers())
Results[idx] = getfrom(pid, Data)
end
return vcat(Results...)
end
@time P_Data = recombine_data(:data); # 0.003132 seconds
P_Data == data ## true
评论
这里使用SharedArray
是多余的。我只是使用它,因为它很容易就地修改,这就是您的代码最初的编写方式。这让我可以更直接地根据您编写的内容进行工作,而无需对其进行太多修改。
我没有在计时赛中包含恢复数据的步骤,但如您所见,在这种情况下,这是非常微不足道的时间。在其他情况下,它可能不那么微不足道,但数据移动只是您面临的并行性问题之一。
一般进行时间试验时,最好的做法是 运行 函数一次(以便编译代码)然后再次 运行 得到时代。这就是我在这里所做的。
查看此 SO post 了解我在此处使用的其中一些功能的灵感来源。
我已经在 Julia 工作了几个月,我有兴趣并行编写我的一些代码。我正在解决一个问题,我使用 1 个模型为几个不同的接收器生成数据(每个接收器的数据是一个向量)。每个接收器的数据都可以独立计算,这让我相信我应该能够使用 pmap 函数。我的计划是将数据初始化为 2D SharedArray(每列代表 1 个接收器的数据),然后让 pmap 循环遍历每一列。但是我发现将 SharedArray 与 pmap 一起使用并不比使用 map 串行工作快。我写了下面的虚拟代码来说明这一点。
@everywhere function Dummy(icol,model,data,A,B)
nx = 250
nz = 250
nh = 50
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
function main()
nx = 250
nz = 250
nh = 50
nt = 500
ncol = 100
model1 = rand(nz,nx,nh)
model2 = copy(model1)
model3 = convert(SharedArray,model1)
data1 = zeros(Float64,nt,ncol)
data2 = SharedArray(Float64,nt,ncol)
data3 = SharedArray(Float64,nt,ncol)
A1 = rand(nz,nx,nh)
A2 = copy(A1)
A3 = convert(SharedArray,A1)
B1 = rand(nz,nx,nh)
B2 = copy(B1)
B3 = convert(SharedArray,B1)
@time map((arg)->Dummy(arg,model1,data1,A1,B1),[icol for icol = 1:ncol])
@time pmap((arg)->Dummy(arg,model2,data2,A2,B2),[icol for icol = 1:ncol])
@time pmap((arg)->Dummy(arg,model3,data3,A3,B3),[icol for icol = 1:ncol])
println(data1==data2)
println(data1==data3)
end
main()
我使用 Julia -p 3
和 运行 脚本启动 Julia 会话。 3次测试的时间分别为1.4s、4.7s和1.6s。与带有 map (1.4s) 的常规数组相比,将 SharedArrays 与 pmap (1.6s 运行time) 一起使用并没有提供任何速度上的改进。我也很困惑为什么第二种情况(数据作为 SharedArray,所有其他输入作为带有 pmap 的常规数组)如此缓慢。我需要更改什么才能从并行工作中获益?
前言:是的,您的问题确实有解决方案。请参阅底部的代码。但是,在我到达那里之前,我会进行一些解释。
我认为问题的根源在于内存访问。首先,虽然我没有严格调查它,但我怀疑可以对 Julia 的底层代码进行一定数量的改进,以改进它在并行处理中处理内存访问的方式。尽管如此,在这种情况下,我 怀疑 基本代码的任何潜在问题,如果确实存在的话,并不是什么大错。相反,我认为仔细考虑您的代码中到底发生了什么以及它对内存访问意味着什么是有用的。
在 Julia 中工作时要牢记的一个关键事项是它以列优先顺序存储数组。也就是说,它将它们存储为彼此堆叠的列堆栈。这也推广到 > 2 的维度。有关详细信息,请参阅 this 非常有用的 Julia 性能提示部分。这意味着在单个列中一行接一行地访问是很快的。但是,如果您需要在列之间跳来跳去,那么您就会遇到麻烦。是的,访问 ram 内存可能相对较快,但访问缓存内存要快得多,因此如果您的代码允许将单个列左右从 ram 加载到缓存中然后继续处理,那么您将做很多事情比需要在 ram 和缓存之间进行大量交换要好。在您的代码中,您在计算之间从一列切换到另一列,这与任何人无关。例如,在您的
pmap
中,每个进程都会获得共享数组的不同列来处理。然后,每个都沿着该列的行向下移动并修改其中的值。但是,由于它们试图彼此并行工作,并且整个阵列太大而无法放入您的缓存中,因此 ram 和缓存之间会发生大量交换,这确实会减慢您的速度。从理论上讲,也许可以设计一个足够聪明的底层内存管理系统来解决这个问题,但我真的不知道——这超出了我的薪水等级。当然,同样的事情也发生在您对其他对象的访问上。并行化时通常要记住的另一件事是触发器(即计算机计算)与 read/write 操作的比率。触发器倾向于很好地并行化,你可以有不同的内核、进程等,对它们保存在它们的小缓存中的自己的数据位进行自己的小计算。但是,read/write 操作不能很好地并行化。可以做一些事情来设计硬件系统来改进这一点。但总的来说,如果你有一个给定的计算机系统,比如说,两个内核,然后再添加四个内核,你执行失败的能力将增加三倍,但你 read/write 数据的能力 to/from ram不会真的提升那么多。 (注意:这是过于简单化,很大程度上取决于您的系统)。然而,一般来说,触发器与 read/write 的比率越高,并行性带来的好处就越多。在你的情况下,你的代码涉及相当数量的 read/writes (所有这些访问你的不同数组)对于相对较少数量的触发器(一些乘法和加法)。这只是要记住的事情。
幸运的是,如果编写正确,您的案例可以通过并行性得到一些很好的加速。根据我使用 Julia 的经验,我所有最成功的并行性都来自于我可以分解数据并让工作人员分别处理数据块。您的情况恰好适合于此。下面是我编写的一些代码的示例。您可以看到,从一个处理器到三个处理器,速度几乎提高了 3 倍。该代码在某些地方有点粗糙,但它至少展示了如何处理此类问题的一般思路。之后我对代码给出一些评论。
addprocs(3)
nx = 250;
nz = 250;
nh = 50;
nt = 250;
@everywhere ncol = 100;
model = rand(nz,nx,nh);
data = SharedArray(Float64,nt,ncol);
A = rand(nz,nx,nh);
B = rand(nz,nx,nh);
function distribute_data(X, obj_name_on_worker::Symbol, dim)
size_per_worker = floor(Int,size(X,1) / nworkers())
StartIdx = 1
EndIdx = size_per_worker
for (idx, pid) in enumerate(workers())
if idx == nworkers()
EndIdx = size(X,1)
end
println(StartIdx:EndIdx)
if dim == 3
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:,:])))
elseif dim == 2
@spawnat(pid, eval(Main, Expr(:(=), obj_name_on_worker, X[StartIdx:EndIdx,:])))
end
StartIdx = EndIdx + 1
EndIdx = EndIdx + size_per_worker - 1
end
end
distribute_data(model, :model, 3)
distribute_data(A, :A, 3)
distribute_data(B, :B, 3)
distribute_data(data, :data, 2)
@everywhere function Dummy(icol,model,data,A,B)
nx = size(model, 2)
nz = size(A,1)
nh = size(model, 3)
for ih = 1:nh
for ix = 1:nx
for iz = 1:nz
data[iz,icol] += A[iz,ix,ih]*B[iz,ix,ih]*model[iz,ix,ih]
end
end
end
end
regular_test() = map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
function parallel_test()
@everywhere begin
if myid() != 1
map((arg)->Dummy(arg,model,data,A,B),[icol for icol = 1:ncol])
end
end
end
@time regular_test(); # 2.120631 seconds (307 allocations: 11.313 KB)
@time parallel_test(); # 0.918850 seconds (5.70 k allocations: 337.250 KB)
getfrom(p::Int, nm::Symbol; mod=Main) = fetch(@spawnat(p, getfield(mod, nm)))
function recombine_data(Data::Symbol)
Results = cell(nworkers())
for (idx, pid) in enumerate(workers())
Results[idx] = getfrom(pid, Data)
end
return vcat(Results...)
end
@time P_Data = recombine_data(:data); # 0.003132 seconds
P_Data == data ## true
评论
这里使用
SharedArray
是多余的。我只是使用它,因为它很容易就地修改,这就是您的代码最初的编写方式。这让我可以更直接地根据您编写的内容进行工作,而无需对其进行太多修改。我没有在计时赛中包含恢复数据的步骤,但如您所见,在这种情况下,这是非常微不足道的时间。在其他情况下,它可能不那么微不足道,但数据移动只是您面临的并行性问题之一。
一般进行时间试验时,最好的做法是 运行 函数一次(以便编译代码)然后再次 运行 得到时代。这就是我在这里所做的。
查看此 SO post 了解我在此处使用的其中一些功能的灵感来源。