Dask 分布式没有 运行 python 脚本

Dask distributed does not run python script

我正在构建一个简单的示例来了解 dask distributed 如何在 HPC 集群上分发 python 脚本。 分发方法是一个基本操作,即在磁盘上写入一个文件。当从命令行 运行 ($python simple-function.py)

时,此脚本工作正常
import os
import argparse
import time

def inc(x):
    time.sleep(1)
    return x + 1

def get_args():
    """
    Get args
    """ 
    parser = argparse.ArgumentParser()
    parser.add_argument("x", help="Value")
    args = parser.parse_args()
    x = args.x
    return int(x)

if __name__ == "__main__":
    x = get_args()       
    print("{0} + 1 = {1}".format(x, inc(x)))
    with open("results.txt", 'w') as file:
        file.write(str(x) + '\n')

现在,我创建了另一个 python 脚本来分发此代码。 这个想法是使用 subprocess 和 client.map 或 client.submit 来启动上述脚本的多个实例。 我 运行 遇到的问题是,当使用以下任何方法(client.map、client.submit 然后收集或计算或 .result())时,不会写入输出 .txt 文件。 也许我没有使用正确的方法?

import os
import time
import subprocess
import yaml
import argparse

import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
  
def create_cluster():
    cluster = PBSCluster(
        cores=4,
        memory="20GB",
        interface="ib0",
        queue="qdev",
        processes=4,
        nanny=True,
        walltime="12:00:00",
        shebang="#!/bin/bash",
        local_directory="$TMPDIR"
        )
    cluster.scale(4)
    time.sleep(10) # Wait for workers
    return cluster

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("script", help="Script to distribute")
    parser.add_argument("nodes", type=int, help="Number of nodes")
    args = parser.parse_args()    
    script = args.script
    n_nodes = args.nodes
    return script, n_nodes

def close_all(client, cluster):
    client.close()
    cluster.close()

def methode(script, x):
    subprocess.run(["python", 
                script,
                x])
    return None

if __name__ == "__main__":
    
    cluster = create_cluster()
    client = Client(cluster)
    time.sleep(1)
    script, n_nodes = get_args() #Get arguments
    #With client.submit
    futures = []    
    for n, o in enumerate(range(10)):
        futures.append(client.submit(methode, *[script, str(o)], priority=-n))

    [f.result() for f in futures]
    #Or client.map
    L = client.map(methode, *[script, str(range(10))])  
    client.compute(L)
    client.gather(L)
    time.sleep(20)  
    close_all(client, cluster)
    

如果我执行以下代码,请注意: dask.compute(方法(参数))

然后,.txt 输出文件被写入。

好像只有不同的客户端方法不行

终于发现执行脚本的路径不对

如果以下命令是来自命令行的运行:

$pythondistribute.py简单-function.py6

不知何故,路径不对,找不到文件

我试过运行把它写进笔记本里。如果我执行:

methode(*[script, "8"]

文件写入当前目录。 但是,如果我执行此单元格:

futures = []
for n, o in enumerate(range(12)):
    print(o)
    futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")

结果文件没有写在当前目录,而是在我的主目录。

我不太确定为什么会发生这种情况,但似乎在使用客户端时,路径发生了一些变化...

所以尝试了一些小改动,对我来说效果很好。我去掉了 HPC 的东西,因为我认为问题是 dask 而不是 dask-jobqueue 并且专注于让 futures 工作。然后我注意到你的 simple-function.py 破坏了所有的结果,所以我只是改变了它。输出是我当前目录中的 results0.txt - results9.txt(不是脚本所在的主目录)。

distribute.py:

import os
import time
import subprocess
import yaml
import argparse

import dask
from dask.distributed import Client

def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument("script", help="Script to distribute")
    parser.add_argument("nodes", type=int, help="Number of nodes")
    args = parser.parse_args()    
    script = args.script
    n_nodes = args.nodes
    return script, n_nodes

def methode(script, x):
    subprocess.run(["python", 
                script,
                x])
    return None

if __name__ == "__main__":
    client = Client()
    script, n_nodes = get_args() #Get arguments
    futures = []    
    for n, o in enumerate(range(10)):
        futures.append(client.submit(methode, *[script, str(o)], priority=-n))

    results = client.gather(futures)
    client.close()

简单-function.py:

import os
import argparse
import time

def inc(x):
    time.sleep(1)
    return x + 1

def get_args():
    """
    Get args
    """ 
    parser = argparse.ArgumentParser()
    parser.add_argument("x", help="Value")
    args = parser.parse_args()
    x = args.x
    return int(x)

if __name__ == "__main__":
    x = get_args()       
    print("{0} + 1 = {1}".format(x, inc(x)))
    with open(f"results{x}.txt", 'w') as file:
        file.write(str(x + 1) + '\n')

编辑:哦,我刚读了你的回答,如果你从 jupyter 单元格执行该代码,它是从笔记本的目录执行的,这可能是你的家。