如何在不关闭与核心的连接的情况下终止 运行 个作业? (目前正在使用 execnet)
How can I terminate running jobs without closing connection to the core? (currently using execnet)
我有一个计算机集群,它使用主节点与集群中的从节点通信。
我面临的主要问题是使用 execnet 能够杀死某些 运行 的作业,然后让新作业在另一个作业刚刚终止的同一核心上重新排队(因为我想要在任何给定时间利用从属节点的所有核心)。
截至目前,无法使用 execnet 终止 运行 作业,所以我想我是否可以通过 bash 脚本手动终止作业,比如 sudo kill 12345
其中12345 是作业的 PID(获取每个作业的 PID 是 execnet 不支持的另一件事,但这是另一个话题),然后它将终止该作业,然后在刚刚终止的同一核心上重新排队另一个作业。它确实正确地终止了作业,但是它关闭了与该通道(核心;主节点单独与每个核心通信)的连接,然后不再使用该核心,直到完成所有作业。 有没有办法在不终止与核心的连接的情况下终止 运行 作业?
这是提交作业的脚本
import execnet, os, sys
import re
import socket
import numpy as np
import pickle, cPickle
from copy import deepcopy
import time
import job
def main():
print 'execnet source files are located at:\n {}/\n'.format(
os.path.join(os.path.dirname(execnet.__file__))
)
# Generate a group of gateways.
work_dir = '/home/mpiuser/pn2/'
f = 'cluster_core_info.txt'
n_start, n_end = 250000, 250008
ci = get_cluster_info(f)
group, g_labels = make_gateway_group(ci, work_dir)
mch = group.remote_exec(job)
args = range(n_start, n_end+1) # List of parameters to compute factorial.
manage_jobs(group, mch, queue, g_labels, args)
# Close the group of gateways.
group.terminate()
def get_cluster_info(f):
nodes, ncores = [], []
with open(f, 'r') as fid:
while True:
line = fid.readline()
if not line:
fid.close()
break
line = line.strip('\n').split()
nodes.append(line[0])
ncores.append(int(line[1]))
return dict( zip(nodes, ncores) )
def make_gateway_group(cluster_info, work_dir):
''' Generate gateways on all cores in remote nodes. '''
print 'Gateways generated:\n'
group = execnet.Group()
g_labels = []
nodes = list(cluster_info.keys())
for node in nodes:
for i in range(cluster_info[node]):
group.makegateway(
"ssh={0}//id={0}_{1}//chdir={2}".format(
node, i, work_dir
))
sys.stdout.write(' ')
sys.stdout.flush()
print list(group)[-1]
# Generate a string 'node-id_core-id'.
g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i))
print ''
return group, g_labels
def get_mch_id(g_labels, string):
ids = [x for x in re.findall(r'\d+', string)]
ids = '{}_{}'.format(*ids)
return g_labels.index(ids)
def manage_jobs(group, mch, queue, g_labels, args):
args_ref = deepcopy(args)
terminated_channels = 0
active_jobs, active_args = [], []
while True:
channel, item = queue.get()
if item == 'terminate_channel':
terminated_channels += 1
print " Gateway closed: {}".format(channel.gateway.id)
if terminated_channels == len(mch):
print "\nAll jobs done.\n"
break
continue
if item != "ready":
mch_id_completed = get_mch_id(g_labels, channel.gateway.id)
depopulate_list(active_jobs, mch_id_completed, active_args)
print " Gateway {} channel id {} returned:".format(
channel.gateway.id, mch_id_completed)
print " {}".format(item)
if not args:
print "\nNo more jobs to submit, sending termination request...\n"
mch.send_each(None)
args = 'terminate_channel'
if args and \
args != 'terminate_channel':
arg = args.pop(0)
idx = args_ref.index(arg)
channel.send(arg) # arg is copied by value to the remote side of
# channel to be executed. Maybe blocked if the
# sender queue is full.
# Get the id of current channel used to submit a job,
# this id can be used to refer mch[id] to terminate a job later.
mch_id_active = get_mch_id(g_labels, channel.gateway.id)
print "Job {}: {}! submitted to gateway {}, channel id {}".format(
idx, arg, channel.gateway.id, mch_id_active)
populate_list(active_jobs, mch_id_active,
active_args, arg)
def populate_list(jobs, job_active, args, arg_active):
jobs.append(job_active)
args.append(arg_active)
def depopulate_list(jobs, job_completed, args):
i = jobs.index(job_completed)
jobs.pop(i)
args.pop(i)
if __name__ == '__main__':
main()
这是我的 job.py 脚本:
#!/usr/bin/env python
import os, sys
import socket
import time
import numpy as np
import pickle, cPickle
import random
import job
def hostname():
return socket.gethostname()
def working_dir():
return os.getcwd()
def listdir(path):
return os.listdir(path)
def fac(arg):
return np.math.factorial(arg)
def dump(arg):
path = working_dir() + '/out'
if not os.path.exists(path):
os.mkdir(path)
f_path = path + '/fac_{}.txt'.format(arg)
t_0 = time.time()
num = fac(arg) # Main operation
t_1 = time.time()
cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation
t_2 = time.time()
duration_0 = "{:.4f}".format(t_1 - t_0)
duration_1 = "{:.4f}".format(t_2 - t_1)
#num2 = cPickle.load(open(f_path, "rb"))
return '--Calculation: {} s, dumping: {} s'.format(
duration_0, duration_1)
if __name__ == '__channelexec__':
channel.send("ready")
for arg in channel:
if arg is None:
break
elif str(arg).isdigit():
channel.send((
str(arg)+'!',
job.hostname(),
job.dump(arg)
))
else:
print 'Warnning! arg sent should be number | None'
是的,您走对了。使用 psutil 库来管理进程,找到它们的 pids 等。
并杀死他们。无需在任何地方涉及 bash。 Python 涵盖所有内容。
或者,更好的是,将您的脚本编程为在主人这样说时终止。
通常是这样做的。
如果您 want/need,您甚至可以让它在终止自身之前启动另一个脚本。
或者,如果它与您在另一个进程中所做的相同,则只需停止当前工作并在脚本中开始一个新工作而根本不终止它。
而且,如果我可以提出建议。不要逐行读取文件,读取整个文件然后使用 *.splitlines()。对于小文件,成块读取它们只会折磨 IO。您也不需要 *.strip() 。你也应该删除未使用的导入。
我有一个计算机集群,它使用主节点与集群中的从节点通信。
我面临的主要问题是使用 execnet 能够杀死某些 运行 的作业,然后让新作业在另一个作业刚刚终止的同一核心上重新排队(因为我想要在任何给定时间利用从属节点的所有核心)。
截至目前,无法使用 execnet 终止 运行 作业,所以我想我是否可以通过 bash 脚本手动终止作业,比如 sudo kill 12345
其中12345 是作业的 PID(获取每个作业的 PID 是 execnet 不支持的另一件事,但这是另一个话题),然后它将终止该作业,然后在刚刚终止的同一核心上重新排队另一个作业。它确实正确地终止了作业,但是它关闭了与该通道(核心;主节点单独与每个核心通信)的连接,然后不再使用该核心,直到完成所有作业。 有没有办法在不终止与核心的连接的情况下终止 运行 作业?
这是提交作业的脚本
import execnet, os, sys
import re
import socket
import numpy as np
import pickle, cPickle
from copy import deepcopy
import time
import job
def main():
print 'execnet source files are located at:\n {}/\n'.format(
os.path.join(os.path.dirname(execnet.__file__))
)
# Generate a group of gateways.
work_dir = '/home/mpiuser/pn2/'
f = 'cluster_core_info.txt'
n_start, n_end = 250000, 250008
ci = get_cluster_info(f)
group, g_labels = make_gateway_group(ci, work_dir)
mch = group.remote_exec(job)
args = range(n_start, n_end+1) # List of parameters to compute factorial.
manage_jobs(group, mch, queue, g_labels, args)
# Close the group of gateways.
group.terminate()
def get_cluster_info(f):
nodes, ncores = [], []
with open(f, 'r') as fid:
while True:
line = fid.readline()
if not line:
fid.close()
break
line = line.strip('\n').split()
nodes.append(line[0])
ncores.append(int(line[1]))
return dict( zip(nodes, ncores) )
def make_gateway_group(cluster_info, work_dir):
''' Generate gateways on all cores in remote nodes. '''
print 'Gateways generated:\n'
group = execnet.Group()
g_labels = []
nodes = list(cluster_info.keys())
for node in nodes:
for i in range(cluster_info[node]):
group.makegateway(
"ssh={0}//id={0}_{1}//chdir={2}".format(
node, i, work_dir
))
sys.stdout.write(' ')
sys.stdout.flush()
print list(group)[-1]
# Generate a string 'node-id_core-id'.
g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i))
print ''
return group, g_labels
def get_mch_id(g_labels, string):
ids = [x for x in re.findall(r'\d+', string)]
ids = '{}_{}'.format(*ids)
return g_labels.index(ids)
def manage_jobs(group, mch, queue, g_labels, args):
args_ref = deepcopy(args)
terminated_channels = 0
active_jobs, active_args = [], []
while True:
channel, item = queue.get()
if item == 'terminate_channel':
terminated_channels += 1
print " Gateway closed: {}".format(channel.gateway.id)
if terminated_channels == len(mch):
print "\nAll jobs done.\n"
break
continue
if item != "ready":
mch_id_completed = get_mch_id(g_labels, channel.gateway.id)
depopulate_list(active_jobs, mch_id_completed, active_args)
print " Gateway {} channel id {} returned:".format(
channel.gateway.id, mch_id_completed)
print " {}".format(item)
if not args:
print "\nNo more jobs to submit, sending termination request...\n"
mch.send_each(None)
args = 'terminate_channel'
if args and \
args != 'terminate_channel':
arg = args.pop(0)
idx = args_ref.index(arg)
channel.send(arg) # arg is copied by value to the remote side of
# channel to be executed. Maybe blocked if the
# sender queue is full.
# Get the id of current channel used to submit a job,
# this id can be used to refer mch[id] to terminate a job later.
mch_id_active = get_mch_id(g_labels, channel.gateway.id)
print "Job {}: {}! submitted to gateway {}, channel id {}".format(
idx, arg, channel.gateway.id, mch_id_active)
populate_list(active_jobs, mch_id_active,
active_args, arg)
def populate_list(jobs, job_active, args, arg_active):
jobs.append(job_active)
args.append(arg_active)
def depopulate_list(jobs, job_completed, args):
i = jobs.index(job_completed)
jobs.pop(i)
args.pop(i)
if __name__ == '__main__':
main()
这是我的 job.py 脚本:
#!/usr/bin/env python
import os, sys
import socket
import time
import numpy as np
import pickle, cPickle
import random
import job
def hostname():
return socket.gethostname()
def working_dir():
return os.getcwd()
def listdir(path):
return os.listdir(path)
def fac(arg):
return np.math.factorial(arg)
def dump(arg):
path = working_dir() + '/out'
if not os.path.exists(path):
os.mkdir(path)
f_path = path + '/fac_{}.txt'.format(arg)
t_0 = time.time()
num = fac(arg) # Main operation
t_1 = time.time()
cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation
t_2 = time.time()
duration_0 = "{:.4f}".format(t_1 - t_0)
duration_1 = "{:.4f}".format(t_2 - t_1)
#num2 = cPickle.load(open(f_path, "rb"))
return '--Calculation: {} s, dumping: {} s'.format(
duration_0, duration_1)
if __name__ == '__channelexec__':
channel.send("ready")
for arg in channel:
if arg is None:
break
elif str(arg).isdigit():
channel.send((
str(arg)+'!',
job.hostname(),
job.dump(arg)
))
else:
print 'Warnning! arg sent should be number | None'
是的,您走对了。使用 psutil 库来管理进程,找到它们的 pids 等。 并杀死他们。无需在任何地方涉及 bash。 Python 涵盖所有内容。
或者,更好的是,将您的脚本编程为在主人这样说时终止。 通常是这样做的。 如果您 want/need,您甚至可以让它在终止自身之前启动另一个脚本。 或者,如果它与您在另一个进程中所做的相同,则只需停止当前工作并在脚本中开始一个新工作而根本不终止它。
而且,如果我可以提出建议。不要逐行读取文件,读取整个文件然后使用 *.splitlines()。对于小文件,成块读取它们只会折磨 IO。您也不需要 *.strip() 。你也应该删除未使用的导入。