如何并行化成对距离矩阵的计算?

How to parallelize computation of pairwise distance matrix?

我的问题大致如下。给定一个数值矩阵 X,其中每一行都是一个项目。我想在除自身以外的所有行中根据 L2 距离找到每一行的最近邻居。我尝试阅读官方文档,但仍然对如何实现这一目标感到困惑。有人可以给我一些提示吗?

我的代码如下

function l2_dist(v1, v2)
    return sqrt(sum((v1 - v2) .^ 2))
end

function main(Mat, dist_fun)
    n = size(Mat, 1)

    Dist = SharedArray{Float64}(n) #[Inf for i in 1:n]
    Id = SharedArray{Int64}(n) #[-1 for i in 1:n]
    @parallel for i = 1:n
        Dist[i] = Inf
        Id[i] = 0
    end
    Threads.@threads for i in 1:n
        for j in 1:n
            if i != j
                println(i, j)
                dist_temp = dist_fun(Mat[i, :], Mat[j, :])
                if dist_temp < Dist[i]
                    println("Dist updated!")
                    Dist[i] = dist_temp
                    Id[i] = j
                end
            end
        end
    end
    return Dict("Dist" => Dist, "Id" => Id)
end

n = 4000
p = 30

X = [rand() for i in 1:n, j in 1:p];


main(X[1:30, :], l2_dist)
@time N = main(X, l2_dist)

我正在尝试将所有 i(即计算每行最小值)分布在不同的核心上。但是上面的版本显然不能正常工作。它甚至比顺序版本慢。有人能指出我正确的方向吗?谢谢

也许你在做一些除了你写下的事情,但是,据我所知,此时你实际上并没有并行进行任何计算。 Julia 要求您告诉它您希望它访问多少个处理器(或线程)。您可以通过

  • 使用多个处理器启动 Julia julia -p #(其中 # 是您希望 Julia 可以访问的处理器数量)
  • 启动 Julia "session" 后,您可以调用 addprocs 函数来添加额外的处理器。
  • 要拥有超过 1 个线程,您需要 运行 命令 export JULIA_NUM_THREADS = #。我不太了解线程,所以我会坚持使用 @parallel 宏。我建议阅读 documentation 以获取有关线程的更多详细信息——也许@Chris Rackauckas 可以进一步扩展差异。

下面关于我的代码和你的代码的一些评论:

  • 我的版本是 0.6.1-pre.0。我不认为我正在做任何特定于 0.6 的事情,但这是为了以防万一。
  • 我将在计算向量之间的距离时使用 Distances.jl 包。我认为将尽可能多的计算外包给编写良好且维护良好的程序包是一个好习惯。
  • 我不计算行之间的距离,而是计算列之间的距离。这是因为 Julia 是一种列优先语言,所以这会增加缓存命中次数并提供一点额外的速度。显然,您只需转置输入即可获得所需的逐行结果。
  • 除非您期望有那么多内存分配,否则那么多分配是代码中某些部分效率低下的标志。这通常是类型稳定性问题。我不知道你之前的代码中是否存在这种情况,但这在当前版本中似乎不是问题(我不是很清楚为什么你有这么多分配)。

代码如下

# Make sure all processors have access to Distances package
@everywhere using Distances

# Create a random matrix
nrow = 30
ncol = 4000

# Seed creation of random matrix so it is always same matrix
srand(42)
X = rand(nrow, ncol)

function main(X::AbstractMatrix{Float64}, M::Distances.Metric)
    # Get size of the matrix
    nrow, ncol = size(X)

    # Create `SharedArray` to store output
    ind_vec = SharedArray{Int}(ncol)
    dist_vec = SharedArray{Float64}(ncol)

    # Compute the distance between columns
    @sync @parallel for i in 1:ncol
        # Initialize various temporary variables
        min_dist_i = Inf
        min_ind_i = -1
        X_i = view(X, :, i)

        # Check distance against all other columns
        for j in 1:ncol
            # Skip comparison with itself
            if i==j
                continue
            end

            # Tell us who is doing the work 
            # (can uncomment if you want to verify stuff)
            # println("Column $i compared with Column $j by worker $(myid())")

            # Evaluate the new distance... 
            # If it is less then replace it, otherwise proceed
            dist_temp = evaluate(M, X_i, view(X, :, j))
            if dist_temp < min_dist_i
                min_dist_i = dist_temp
                min_ind_i = j
            end
        end

        # Which column is minimum distance from column i
        dist_vec[i] = min_dist_i
        ind_vec[i] = min_ind_i
    end

    return dist_vec, ind_vec
end

# Using Euclidean metric
metric = Euclidean()
inds, dist = main(X, metric)

@time main(X, metric);
@show dist[[1, 5, 25]], inds[[1, 5, 25]]

您可以 运行 代码

  • 1 个处理器julia testfile.jl

    % julia testfile.jl
      0.640365 seconds (16.00 M allocations: 732.495 MiB, 3.70% gc time)
      (dist[[1, 5, 25]], inds[[1, 5, 25]]) = ([2541, 2459, 1602], [1.40892, 1.38206, 1.32184])
    
  • n 个处理器(在本例中为 4 个)julia -p n testfile.jl

    % julia -p 4 testfile.jl
      0.201523 seconds (2.10 k allocations: 99.107 KiB)
      (dist[[1, 5, 25]], inds[[1, 5, 25]]) = ([2541, 2459, 1602], [1.40892, 1.38206, 1.32184])