Dask 分布式没有 运行 python 脚本
Dask distributed does not run python script
我正在构建一个简单的示例来了解 dask distributed 如何在 HPC 集群上分发 python 脚本。
分发方法是一个基本操作,即在磁盘上写入一个文件。当从命令行 运行 ($python simple-function.py)
时,此脚本工作正常
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open("results.txt", 'w') as file:
file.write(str(x) + '\n')
现在,我创建了另一个 python 脚本来分发此代码。
这个想法是使用 subprocess 和 client.map 或 client.submit 来启动上述脚本的多个实例。
我 运行 遇到的问题是,当使用以下任何方法(client.map、client.submit 然后收集或计算或 .result())时,不会写入输出 .txt 文件。
也许我没有使用正确的方法?
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def create_cluster():
cluster = PBSCluster(
cores=4,
memory="20GB",
interface="ib0",
queue="qdev",
processes=4,
nanny=True,
walltime="12:00:00",
shebang="#!/bin/bash",
local_directory="$TMPDIR"
)
cluster.scale(4)
time.sleep(10) # Wait for workers
return cluster
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def close_all(client, cluster):
client.close()
cluster.close()
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
cluster = create_cluster()
client = Client(cluster)
time.sleep(1)
script, n_nodes = get_args() #Get arguments
#With client.submit
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
[f.result() for f in futures]
#Or client.map
L = client.map(methode, *[script, str(range(10))])
client.compute(L)
client.gather(L)
time.sleep(20)
close_all(client, cluster)
如果我执行以下代码,请注意:
dask.compute(方法(参数))
然后,.txt 输出文件被写入。
好像只有不同的客户端方法不行
终于发现执行脚本的路径不对
如果以下命令是来自命令行的运行:
$pythondistribute.py简单-function.py6
不知何故,路径不对,找不到文件
我试过运行把它写进笔记本里。如果我执行:
methode(*[script, "8"]
文件写入当前目录。
但是,如果我执行此单元格:
futures = []
for n, o in enumerate(range(12)):
print(o)
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")
结果文件没有写在当前目录,而是在我的主目录。
我不太确定为什么会发生这种情况,但似乎在使用客户端时,路径发生了一些变化...
所以尝试了一些小改动,对我来说效果很好。我去掉了 HPC 的东西,因为我认为问题是 dask 而不是 dask-jobqueue 并且专注于让 futures 工作。然后我注意到你的 simple-function.py 破坏了所有的结果,所以我只是改变了它。输出是我当前目录中的 results0.txt - results9.txt(不是脚本所在的主目录)。
distribute.py:
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
client = Client()
script, n_nodes = get_args() #Get arguments
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
results = client.gather(futures)
client.close()
简单-function.py:
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open(f"results{x}.txt", 'w') as file:
file.write(str(x + 1) + '\n')
编辑:哦,我刚读了你的回答,如果你从 jupyter 单元格执行该代码,它是从笔记本的目录执行的,这可能是你的家。
我正在构建一个简单的示例来了解 dask distributed 如何在 HPC 集群上分发 python 脚本。 分发方法是一个基本操作,即在磁盘上写入一个文件。当从命令行 运行 ($python simple-function.py)
时,此脚本工作正常import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open("results.txt", 'w') as file:
file.write(str(x) + '\n')
现在,我创建了另一个 python 脚本来分发此代码。 这个想法是使用 subprocess 和 client.map 或 client.submit 来启动上述脚本的多个实例。 我 运行 遇到的问题是,当使用以下任何方法(client.map、client.submit 然后收集或计算或 .result())时,不会写入输出 .txt 文件。 也许我没有使用正确的方法?
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
from dask_jobqueue import PBSCluster
def create_cluster():
cluster = PBSCluster(
cores=4,
memory="20GB",
interface="ib0",
queue="qdev",
processes=4,
nanny=True,
walltime="12:00:00",
shebang="#!/bin/bash",
local_directory="$TMPDIR"
)
cluster.scale(4)
time.sleep(10) # Wait for workers
return cluster
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def close_all(client, cluster):
client.close()
cluster.close()
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
cluster = create_cluster()
client = Client(cluster)
time.sleep(1)
script, n_nodes = get_args() #Get arguments
#With client.submit
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
[f.result() for f in futures]
#Or client.map
L = client.map(methode, *[script, str(range(10))])
client.compute(L)
client.gather(L)
time.sleep(20)
close_all(client, cluster)
如果我执行以下代码,请注意: dask.compute(方法(参数))
然后,.txt 输出文件被写入。
好像只有不同的客户端方法不行
终于发现执行脚本的路径不对
如果以下命令是来自命令行的运行:
$pythondistribute.py简单-function.py6
不知何故,路径不对,找不到文件
我试过运行把它写进笔记本里。如果我执行:
methode(*[script, "8"]
文件写入当前目录。 但是,如果我执行此单元格:
futures = []
for n, o in enumerate(range(12)):
print(o)
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
wait(futures)
print("Done !")
结果文件没有写在当前目录,而是在我的主目录。
我不太确定为什么会发生这种情况,但似乎在使用客户端时,路径发生了一些变化...
所以尝试了一些小改动,对我来说效果很好。我去掉了 HPC 的东西,因为我认为问题是 dask 而不是 dask-jobqueue 并且专注于让 futures 工作。然后我注意到你的 simple-function.py 破坏了所有的结果,所以我只是改变了它。输出是我当前目录中的 results0.txt - results9.txt(不是脚本所在的主目录)。
distribute.py:
import os
import time
import subprocess
import yaml
import argparse
import dask
from dask.distributed import Client
def get_args():
parser = argparse.ArgumentParser()
parser.add_argument("script", help="Script to distribute")
parser.add_argument("nodes", type=int, help="Number of nodes")
args = parser.parse_args()
script = args.script
n_nodes = args.nodes
return script, n_nodes
def methode(script, x):
subprocess.run(["python",
script,
x])
return None
if __name__ == "__main__":
client = Client()
script, n_nodes = get_args() #Get arguments
futures = []
for n, o in enumerate(range(10)):
futures.append(client.submit(methode, *[script, str(o)], priority=-n))
results = client.gather(futures)
client.close()
简单-function.py:
import os
import argparse
import time
def inc(x):
time.sleep(1)
return x + 1
def get_args():
"""
Get args
"""
parser = argparse.ArgumentParser()
parser.add_argument("x", help="Value")
args = parser.parse_args()
x = args.x
return int(x)
if __name__ == "__main__":
x = get_args()
print("{0} + 1 = {1}".format(x, inc(x)))
with open(f"results{x}.txt", 'w') as file:
file.write(str(x + 1) + '\n')
编辑:哦,我刚读了你的回答,如果你从 jupyter 单元格执行该代码,它是从笔记本的目录执行的,这可能是你的家。