如何在不关闭与核心的连接的情况下终止 运行 个作业? (目前正在使用 execnet)

How can I terminate running jobs without closing connection to the core? (currently using execnet)

我有一个计算机集群,它使用主节点与集群中的从节点通信。

我面临的主要问题是使用 execnet 能够杀死某些 运行 的作业,然后让新作业在另一个作业刚刚终止的同一核心上重新排队(因为我想要在任何给定时间利用从属节点的所有核心)。

截至目前,无法使用 execnet 终止 运行 作业,所以我想我是否可以通过 bash 脚本手动终止作业,比如 sudo kill 12345 其中12345 是作业的 PID(获取每个作业的 PID 是 execnet 不支持的另一件事,但这是另一个话题),然后它将终止该作业,然后在刚刚终止的同一核心上重新排队另一个作业。它确实正确地终止了作业,但是它关闭了与该通道(核心;主节点单独与每个核心通信)的连接,然后不再使用该核心,直到完成所有作业。 有没有办法在不终止与核心的连接的情况下终止 运行 作业?

这是提交作业的脚本

import execnet, os, sys
import re
import socket
import numpy as np
import pickle, cPickle
from copy import deepcopy
import time
import job


def main():
    print 'execnet source files are located at:\n  {}/\n'.format(
          os.path.join(os.path.dirname(execnet.__file__))
          )

# Generate a group of gateways.
work_dir = '/home/mpiuser/pn2/'
f = 'cluster_core_info.txt'
n_start, n_end = 250000, 250008

ci = get_cluster_info(f)
group, g_labels = make_gateway_group(ci, work_dir)


mch = group.remote_exec(job)

args = range(n_start, n_end+1)  # List of parameters to compute factorial.
manage_jobs(group, mch, queue, g_labels, args)

# Close the group of gateways.
group.terminate()

def get_cluster_info(f):
    nodes, ncores = [], []
    with open(f, 'r') as fid:
        while True:
            line = fid.readline()
            if not line:
                fid.close()
                break
            line = line.strip('\n').split()
            nodes.append(line[0])
            ncores.append(int(line[1]))
    return dict( zip(nodes, ncores) )

def make_gateway_group(cluster_info, work_dir):
    ''' Generate gateways on all cores in remote nodes. '''
    print 'Gateways generated:\n'
    group = execnet.Group()
    g_labels = []
    nodes = list(cluster_info.keys())
    for node in nodes:
        for i in range(cluster_info[node]):
            group.makegateway(
                "ssh={0}//id={0}_{1}//chdir={2}".format(
                node, i, work_dir
                ))
            sys.stdout.write('  ')
            sys.stdout.flush()
            print list(group)[-1]
            # Generate a string 'node-id_core-id'.
            g_labels.append('{}_{}'.format(re.findall(r'\d+',node)[0], i))
    print ''
    return group, g_labels

def get_mch_id(g_labels, string):
    ids = [x for x in re.findall(r'\d+', string)]
    ids =  '{}_{}'.format(*ids)
    return g_labels.index(ids)

def manage_jobs(group, mch, queue, g_labels, args):
    args_ref = deepcopy(args)
    terminated_channels = 0
    active_jobs, active_args = [], []
while True:
    channel, item = queue.get()

    if item == 'terminate_channel':
        terminated_channels += 1
        print "  Gateway closed: {}".format(channel.gateway.id)
        if terminated_channels == len(mch):
            print "\nAll jobs done.\n"
            break
        continue

    if item != "ready":
        mch_id_completed = get_mch_id(g_labels, channel.gateway.id)
        depopulate_list(active_jobs, mch_id_completed, active_args)
        print "  Gateway {} channel id {} returned:".format(
              channel.gateway.id, mch_id_completed)
        print "  {}".format(item)

    if not args:
        print "\nNo more jobs to submit, sending termination request...\n"
        mch.send_each(None)
        args = 'terminate_channel'

    if args and \
       args != 'terminate_channel':
        arg = args.pop(0)
        idx = args_ref.index(arg)
        channel.send(arg)  # arg is copied by value to the remote side of
                           # channel to be executed. Maybe blocked if the
                           # sender queue is full.

        # Get the id of current channel used to submit a job,
        # this id can be used to refer mch[id] to terminate a job later.
        mch_id_active = get_mch_id(g_labels, channel.gateway.id)
        print "Job {}:  {}!  submitted to gateway {}, channel id {}".format(
              idx, arg, channel.gateway.id, mch_id_active)
        populate_list(active_jobs, mch_id_active,
                      active_args, arg)


def populate_list(jobs, job_active, args, arg_active):
    jobs.append(job_active)
    args.append(arg_active)

def depopulate_list(jobs, job_completed, args):
    i = jobs.index(job_completed)
    jobs.pop(i)
    args.pop(i)


if __name__ == '__main__':
    main()

这是我的 job.py 脚本:

#!/usr/bin/env python
import os, sys
import socket
import time
import numpy as np
import pickle, cPickle
import random
import job


def hostname():
    return socket.gethostname()

def working_dir():
    return os.getcwd()

def listdir(path):
    return os.listdir(path)

def fac(arg):
    return np.math.factorial(arg)

def dump(arg):
    path = working_dir() + '/out'
    if not os.path.exists(path):
        os.mkdir(path)
    f_path = path + '/fac_{}.txt'.format(arg)
    t_0 = time.time()
    num = fac(arg)                                   # Main operation
    t_1 = time.time()
    cPickle.dump(num, open(f_path, "w"), protocol=2) # Main operation
    t_2 = time.time()
    duration_0 = "{:.4f}".format(t_1 - t_0)
    duration_1 = "{:.4f}".format(t_2 - t_1)
    #num2 = cPickle.load(open(f_path, "rb"))
    return '--Calculation: {} s, dumping: {} s'.format(
           duration_0, duration_1)


if __name__ == '__channelexec__':
    channel.send("ready")

    for arg in channel:
        if arg is None:
            break
        elif str(arg).isdigit():
            channel.send((
                  str(arg)+'!',
                  job.hostname(),
                  job.dump(arg)
                ))
        else:
            print 'Warnning! arg sent should be number | None'

是的,您走对了。使用 psutil 库来管理进程,找到它们的 pids 等。 并杀死他们。无需在任何地方涉及 bash。 Python 涵盖所有内容。

或者,更好的是,将您的脚本编程为在主人这样说时终止。 通常是这样做的。 如果您 want/need,您甚至可以让它在终止自身之前启动另一个脚本。 或者,如果它与您在另一个进程中所做的相同,则只需停止当前工作并在脚本中开始一个新工作而根本不终止它。

而且,如果我可以提出建议。不要逐行读取文件,读取整个文件然后使用 *.splitlines()。对于小文件,成块读取它们只会折磨 IO。您也不需要 *.strip() 。你也应该删除未使用的导入。