在 Julia 中并行操作大型常量数据结构
Operating in parallel on a large constant datastructure in Julia
我有一个很大的字符串向量向量:
大约有 50,000 个字符串向量,
每个包含 2-15 个长度为 1-20 个字符的字符串。
MyScoringOperation
是一个函数,它对字符串向量(数据)和 returns 10100 个分数的数组(如 Float64s)进行操作。 运行 MyScoringOperation
(取决于数据的长度)
大约需要0.01秒
function MyScoringOperation(state:State, datum::Vector{String})
...
score::Vector{Float64} #Size of score = 10000
我有一个嵌套循环。
外循环通常会 运行s 进行 500 次迭代
data::Vector{Vector{String}} = loaddata()
for ii in 1:500
score_total = zeros(10100)
for datum in data
score_total+=MyScoringOperation(datum)
end
end
在一台计算机上,在 3000(而不是 50,000)的小型测试用例上,每个外循环需要 100-300 秒。
我有 3 台安装了 Julia 3.9 的强大服务器(可以更轻松地获得 3 台,然后可以在下一个规模上获得数百台)。
我对@parallel 有基本的经验,但是它似乎花了很多时间来复制常量(它或多或少挂在较小的测试用例上)
看起来像:
data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500
score_total = @parallel(+) for datum in data
MyScoringOperation(state, datum)
end
state = update(state, score_total)
end
我对这个实现与@parallel 一起工作的方式的理解是:
每个 ii
:
- 将
data
划分为每个工人的卡盘
- 将卡盘发送给每个工人
- 在那里处理所有进程块
- 主程序在结果到达时对结果求和。
我想删除第 2 步,
这样就不用向每个工作人员发送一大块数据,
我只是向每个工作人员发送一系列索引,他们从他们自己的 data
副本中查找它。甚至更好,只给每个人自己的块,并让他们每次都重用它(节省大量 RAM)。
分析支持我对@parellel 功能的看法。
对于类似范围的问题(数据更小),
非并行版本 运行s 在 0.09 秒内,
和并行 运行s
分析器显示几乎所有时间都花费了 185 秒。
Profiler 显示其中几乎 100% 用于与网络 IO 交互。
这应该让你开始:
function get_chunks(data::Vector, nchunks::Int)
base_len, remainder = divrem(length(data),nchunks)
chunk_len = fill(base_len,nchunks)
chunk_len[1:remainder]+=1 #remained will always be less than nchunks
function _it()
for ii in 1:nchunks
chunk_start = sum(chunk_len[1:ii-1])+1
chunk_end = chunk_start + chunk_len[ii] -1
chunk = data[chunk_start: chunk_end]
produce(chunk)
end
end
Task(_it)
end
function r_chunk_data(data::Vector)
all_chuncks = get_chunks(data, nworkers()) |> collect;
remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
#Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end
function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
total = nothing
#TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe
@sync for rr in rem_results
function gather(rr)
res=fetch(rr)
if total===nothing
total=res
else
total=red_acc(total,res)
end
end
@async gather(rr)
end
total
end
function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
rem_results = map(r_chunks) do rchunk
function do_mapred()
@assert r_chunk.where==myid()
@pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
end
remotecall(r_chunk.where,do_mapred)
end
@pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end
rchunk_data
将数据分成块(由 get_chunks
方法定义)并将这些块分别发送给不同的工作人员,并将它们存储在 RemoteRefs 中。
RemoteRefs 是对其他进程(可能还有计算机)内存的引用,
prechunked_map_reduce
对一种 map reduce 做了一个变体,让每个工人首先 运行 map_fun
在它的每个卡盘元素上,然后减少它的卡盘中的所有元素使用 red_acc
(减少累加器函数)。最后,每个工人 returns 都有结果,然后通过使用 red_acc
将它们全部合并在一起,这次使用 fetch_reduce
,这样我们就可以添加第一个完成的。
fetch_reduce
是一个非阻塞的提取和归约操作。我相信它没有竞争条件,尽管这可能是因为 @async
和 @sync
中的实现细节。当 julia 0.4 出来时,很容易加锁使其明显没有竞争条件。
这段代码并没有真正经受住考验。我不相信
您可能还想看看使卡盘大小可调,这样您就可以看到更多数据给更快的工作人员(如果有些人有更好的网络或更快的 cpus)
您需要将您的代码重新表达为 map-reduce 问题,这看起来并不难。
测试:
data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))
分配给 8 个工作人员(其中 none 个与启动器在同一台机器上)时,耗时约 0.03 秒
vs 运行仅在本地:
@time reduce(+,map(mean,data))
用了 ~0.06 秒。
我有一个很大的字符串向量向量: 大约有 50,000 个字符串向量, 每个包含 2-15 个长度为 1-20 个字符的字符串。
MyScoringOperation
是一个函数,它对字符串向量(数据)和 returns 10100 个分数的数组(如 Float64s)进行操作。 运行 MyScoringOperation
(取决于数据的长度)
function MyScoringOperation(state:State, datum::Vector{String})
...
score::Vector{Float64} #Size of score = 10000
我有一个嵌套循环。 外循环通常会 运行s 进行 500 次迭代
data::Vector{Vector{String}} = loaddata()
for ii in 1:500
score_total = zeros(10100)
for datum in data
score_total+=MyScoringOperation(datum)
end
end
在一台计算机上,在 3000(而不是 50,000)的小型测试用例上,每个外循环需要 100-300 秒。
我有 3 台安装了 Julia 3.9 的强大服务器(可以更轻松地获得 3 台,然后可以在下一个规模上获得数百台)。
我对@parallel 有基本的经验,但是它似乎花了很多时间来复制常量(它或多或少挂在较小的测试用例上)
看起来像:
data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500
score_total = @parallel(+) for datum in data
MyScoringOperation(state, datum)
end
state = update(state, score_total)
end
我对这个实现与@parallel 一起工作的方式的理解是:
每个 ii
:
- 将
data
划分为每个工人的卡盘 - 将卡盘发送给每个工人
- 在那里处理所有进程块
- 主程序在结果到达时对结果求和。
我想删除第 2 步,
这样就不用向每个工作人员发送一大块数据,
我只是向每个工作人员发送一系列索引,他们从他们自己的 data
副本中查找它。甚至更好,只给每个人自己的块,并让他们每次都重用它(节省大量 RAM)。
分析支持我对@parellel 功能的看法。 对于类似范围的问题(数据更小), 非并行版本 运行s 在 0.09 秒内, 和并行 运行s 分析器显示几乎所有时间都花费了 185 秒。 Profiler 显示其中几乎 100% 用于与网络 IO 交互。
这应该让你开始:
function get_chunks(data::Vector, nchunks::Int)
base_len, remainder = divrem(length(data),nchunks)
chunk_len = fill(base_len,nchunks)
chunk_len[1:remainder]+=1 #remained will always be less than nchunks
function _it()
for ii in 1:nchunks
chunk_start = sum(chunk_len[1:ii-1])+1
chunk_end = chunk_start + chunk_len[ii] -1
chunk = data[chunk_start: chunk_end]
produce(chunk)
end
end
Task(_it)
end
function r_chunk_data(data::Vector)
all_chuncks = get_chunks(data, nworkers()) |> collect;
remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
#Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end
function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
total = nothing
#TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe
@sync for rr in rem_results
function gather(rr)
res=fetch(rr)
if total===nothing
total=res
else
total=red_acc(total,res)
end
end
@async gather(rr)
end
total
end
function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
rem_results = map(r_chunks) do rchunk
function do_mapred()
@assert r_chunk.where==myid()
@pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
end
remotecall(r_chunk.where,do_mapred)
end
@pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end
rchunk_data
将数据分成块(由 get_chunks
方法定义)并将这些块分别发送给不同的工作人员,并将它们存储在 RemoteRefs 中。
RemoteRefs 是对其他进程(可能还有计算机)内存的引用,
prechunked_map_reduce
对一种 map reduce 做了一个变体,让每个工人首先 运行 map_fun
在它的每个卡盘元素上,然后减少它的卡盘中的所有元素使用 red_acc
(减少累加器函数)。最后,每个工人 returns 都有结果,然后通过使用 red_acc
将它们全部合并在一起,这次使用 fetch_reduce
,这样我们就可以添加第一个完成的。
fetch_reduce
是一个非阻塞的提取和归约操作。我相信它没有竞争条件,尽管这可能是因为 @async
和 @sync
中的实现细节。当 julia 0.4 出来时,很容易加锁使其明显没有竞争条件。
这段代码并没有真正经受住考验。我不相信 您可能还想看看使卡盘大小可调,这样您就可以看到更多数据给更快的工作人员(如果有些人有更好的网络或更快的 cpus)
您需要将您的代码重新表达为 map-reduce 问题,这看起来并不难。
测试:
data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))
分配给 8 个工作人员(其中 none 个与启动器在同一台机器上)时,耗时约 0.03 秒
vs 运行仅在本地:
@time reduce(+,map(mean,data))
用了 ~0.06 秒。