在 Julia 中并行操作大型常量数据结构

Operating in parallel on a large constant datastructure in Julia

我有一个很大的字符串向量向量: 大约有 50,000 个字符串向量, 每个包含 2-15 个长度为 1-20 个字符的字符串。

MyScoringOperation 是一个函数,它对字符串向量(数据)和 returns 10100 个分数的数组(如 Float64s)进行操作。 运行 MyScoringOperation(取决于数据的长度)

大约需要0.01秒
function MyScoringOperation(state:State, datum::Vector{String})
      ...
      score::Vector{Float64} #Size of score = 10000

我有一个嵌套循环。 外循环通常会 运行s 进行 500 次迭代

data::Vector{Vector{String}} = loaddata()
for ii in 1:500 
    score_total = zeros(10100)
    for datum in data
         score_total+=MyScoringOperation(datum)
    end
end

在一台计算机上,在 3000(而不是 50,000)的小型测试用例上,每个外循环需要 100-300 秒。

我有 3 台安装了 Julia 3.9 的强大服务器(可以更轻松地获得 3 台,然后可以在下一个规模上获得数百台)。


我对@parallel 有基本的经验,但是它似乎花了很多时间来复制常量(它或多或少挂在较小的测试用例上)

看起来像:

data::Vector{Vector{String}} = loaddata()
state = init_state()
for ii in 1:500 

    score_total = @parallel(+) for datum in data
         MyScoringOperation(state, datum)
    end
    state = update(state, score_total)
end

我对这个实现与@parallel 一起工作的方式的理解是:

每个 ii:

  1. data 划分为每个工人的卡盘
  2. 将卡盘发送给每个工人
  3. 在那里处理所有进程块
  4. 主程序在结果到达时对结果求和。

我想删除第 2 步, 这样就不用向每个工作人员发送一大块数据, 我只是向每个工作人员发送一系列索引,他们从他们自己的 data 副本中查找它。甚至更好,只给每个人自己的块,并让他们每次都重用它(节省大量 RAM)。


分析支持我对@parellel 功能的看法。 对于类似范围的问题(数据更小), 非并行版本 运行s 在 0.09 秒内, 和并行 运行s 分析器显示几乎所有时间都花费了 185 秒。 Profiler 显示其中几乎 100% 用于与网络 IO 交互。

这应该让你开始:

function get_chunks(data::Vector, nchunks::Int)
    base_len, remainder = divrem(length(data),nchunks)
    chunk_len = fill(base_len,nchunks)
    chunk_len[1:remainder]+=1 #remained will always be less than nchunks
    function _it() 
        for ii in 1:nchunks
            chunk_start = sum(chunk_len[1:ii-1])+1
            chunk_end = chunk_start + chunk_len[ii] -1
            chunk = data[chunk_start: chunk_end]
            produce(chunk)
        end
    end
    Task(_it)
end

function r_chunk_data(data::Vector)
    all_chuncks = get_chunks(data, nworkers()) |> collect;
    remote_chunks = [put!(RemoteRef(pid)::RemoteRef, all_chuncks[ii]) for (ii,pid) in enumerate(workers())]
    #Have to add the type annotation sas otherwise it thinks that, RemoteRef(pid) might return a RemoteValue
end



function fetch_reduce(red_acc::Function, rem_results::Vector{RemoteRef})
    total = nothing 
    #TODO: consider strongly wrapping total in a lock, when in 0.4, so that it is garenteed safe 
    @sync for rr in rem_results
        function gather(rr)
            res=fetch(rr)
            if total===nothing
                total=res
            else 
                total=red_acc(total,res)
            end
        end
        @async gather(rr)
    end
    total
end

function prechunked_mapreduce(r_chunks::Vector{RemoteRef}, map_fun::Function, red_acc::Function)
    rem_results = map(r_chunks) do rchunk
        function do_mapred()
            @assert r_chunk.where==myid()
            @pipe r_chunk |> fetch |> map(map_fun,_) |> reduce(red_acc, _)
        end
        remotecall(r_chunk.where,do_mapred)
    end
    @pipe rem_results|> convert(Vector{RemoteRef},_) |> fetch_reduce(red_acc, _)
end

rchunk_data 将数据分成块(由 get_chunks 方法定义)并将这些块分别发送给不同的工作人员,并将它们存储在 RemoteRefs 中。 RemoteRefs 是对其他进程(可能还有计算机)内存的引用,

prechunked_map_reduce 对一种 map reduce 做了一个变体,让每个工人首先 运行 map_fun 在它的每个卡盘元素上,然后减少它的卡盘中的所有元素使用 red_acc(减少累加器函数)。最后,每个工人 returns 都有结果,然后通过使用 red_acc 将它们全部合并在一起,这次使用 fetch_reduce,这样我们就可以添加第一个完成的。

fetch_reduce 是一个非阻塞的提取和归约操作。我相信它没有竞争条件,尽管这可能是因为 @async@sync 中的实现细节。当 julia 0.4 出来时,很容易加锁使其明显没有竞争条件。

这段代码并没有真正经受住考验。我不相信 您可能还想看看使卡盘大小可调,这样您就可以看到更多数据给更快的工作人员(如果有些人有更好的网络或更快的 cpus)

您需要将您的代码重新表达为 map-reduce 问题,这看起来并不难。


测试:

data = [float([eye(100),eye(100)])[:] for _ in 1:3000] #480Mb
chunk_data(:data, data)
@time prechunked_mapreduce(:data, mean, (+))

分配给 8 个工作人员(其中 none 个与启动器在同一台机器上)时,耗时约 0.03 秒

vs 运行仅在本地:

@time reduce(+,map(mean,data))

用了 ~0.06 秒。