Julia - 文件中有函数的 worker 上的 LoadError(主要)

Julia - LoadError on worker with function in the file (main)

我开始尝试使用 Julia 进行并行处理。

我在这个例子中使用了 @spawn 宏,但是在使用 remotecall_fetch 函数时出现了同样的错误。

代码如下:

function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1}, threads::Int16=4)    
    # initialize workers
    addprocs(threads)

    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
threads = Int16(2)
parallel_count_proteins(flist, threads)

当我尝试使用 fetch():

获取结果时发生错误

ERROR: LoadError: On worker 3

...这是堆栈跟踪:

Stacktrace:
 [1] #remotecall_fetch#149(::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}, ::Function, ::Function, ::Distributed.Worker, ::Distributed.RRID) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:379
 [2] remotecall_fetch(::Function, ::Distributed.Worker, ::Distributed.RRID, ::Vararg{Any,N} where N) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:371
 [3] #remotecall_fetch#152 at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [4] remotecall_fetch at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:406 [inlined]
 [5] call_on_owner at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:479 [inlined]
 [6] fetch(::Future) at /Users/osx/buildbot/slave/package_osx64/build/usr/share/julia/stdlib/v1.1/Distributed/src/remotecall.jl:511
 [7] parallel_count_proteins(::Array{String,1}, ::Int16) at /Users/salvocos/Google_Drive/julia_programming/mcl_graph_to_label.jl:150
 [8] top-level scope at none:0
 [9] include at ./boot.jl:326 [inlined]
 [10] include_relative(::Module, ::String) at ./loading.jl:1038
 [11] include(::Module, ::String) at ./sysimg.jl:29
 [12] exec_options(::Base.JLOptions) at ./client.jl:267
 [13] _start() at ./client.jl:436

我知道需要让所有工作人员都知道函数的存在 count_proteins 但我不太确定该怎么做。

如您所说,您需要使 count_proteins 可用于所有工作进程。

您可以在函数定义之前使用 @everywhere 宏,使它们对所有工作人员都可用。 @everywhere 对所有 worker 执行给定的表达式。

另一种方法是将 应该 可供工人使用的函数放在另一个 .jl 文件和 @everywhere include("my_helper_functions.jl") 中,或者将您的函数定义放在里面一个 begin...end 块并在 begin 和 运行 块之前放置一个 @everywhere。您需要在创建工作进程后执行此操作。将这些函数放在 module/package 和 运行ning @everywhere using MyModule 中也应该有效。

对于您的代码,解决方案是

# addprocs here before @everywhere definitions
addprocs(2)

@everywhere function count_proteins(fpath::String)
    cnt::Int = 0
    if !isfile(fpath)
        write(Base.stderr, "FASTA not found!")
    else
        reader = open(FASTA.Reader, fpath)
        for record in reader
            cnt += 1
        end
    end
    # return the count
    cnt
end


"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})
    fut = Dict{Int, Future}()

    # launch the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        fut[i] = r
    end

    for (i, res) in fut
        s = fetch(res)
    end
end

### MAIN ###
flist = ["f1", "f2", "f3", "f4"]
parallel_count_proteins(flist)

附带说明一下,如果我理解您正在尝试正确执行的操作,您可以在此处简单地使用 pmap,这会将任务一个接一个地发送到进程,从而有效地平衡负载。

您可能会发现阅读 the manual entry regarding code and data availability in parallel computing and also the Parallel Computing section overall. For data availability part there is also a package called ParallelDataTransfer.jl 很有用,如果您需要的话,它可以使在进程之间移动数据变得更加容易。

正如@hckr 上面很好地解释的那样,在使用 @everywhere 宏之前应该部署工作人员(使用 addprocs(threads))。

@everywhere 可以在程序的不同部分以不同的方式调用和使用。 在我的例子中,我正在从模块中并行加载我想要 运行 的函数。

要与我使用的主要函数并行使用此函数 @everywhere include("myModule.jl")

以下是 MyModule 的代码:

module MyModule    
using Distributed
using Printf: @printf
using Base

"""Count sequences in the input FASTA"""
function count_proteins(fpath::String)::Int
    cnt::Int = 0
    #@show fpath
    if !isfile(fpath)
        write(Base.stderr, "\nInput FASTA not found!")
    else
        open(fpath, "r") do ifd
            for ln in eachline(ifd)
                if ln[1] == '>'
                    #println(ln)
                    cnt += 1
                end
            end
        end
    end
    # return the count
    @printf("%s\t%d\n", fpath, cnt)
    cnt
end

"""Count sequences in parallel."""
function parallel_count_proteins(fPaths::Array{String, 1})

    # spawn the jobs
    for (i, fastaPath) in enumerate(fPaths)
        r = @spawn count_proteins(fastaPath)
        # @show r
        s = fetch(r)
    end    
end

下面是 main.jl 使用 MyModule 中的函数 parallel_count_proteins

### main.jl ###
using Base
using Distributed
using Printf: @printf

# add path to the modules directory
push!(LOAD_PATH, dirname(@__FILE__)) # MyModule is in the same directory as main.jl

#### MAIN START ####
# deploy the workers
addprocs(4)
# load modules with multi-core functions
@everywhere include(joinpath(dirname(@__FILE__), "MyModule.jl"))

# paths with 4 input files (all in same dir as main.jl)
flist = ["tv1", "tv2", "tv3", "tv4"]

# count proteins
MyModule.parallel_count_proteins(flist)