区分 python 多进程的 OS 进程的简单方法

Easy way to tell apart python multiprocessing's OS processes

总结

我想使用 Python 多处理模块在 Linux 服务器上并行 运行 多个作业。此外,我希望能够使用 toppskill 查看 运行ning 进程,但让其他 运行 .

但是,我看到的是从 Python 多处理模块启动的每个进程看起来都与 ps -f 命令相同。

我只看到了这个:

fermion:workspace ross$ ps -f
  UID   PID  PPID   C STIME   TTY           TIME CMD
  501 32257 32256   0  8:52PM ttys000    0:00.04 -bash
  501 32333 32257   0  9:05PM ttys000    0:00.04 python ./parallel_jobs.py
  501 32334 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32335 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32336 32333   0  9:05PM ttys000    0:00.00 python ./parallel_jobs.py
  501 32272 32271   0  8:53PM ttys001    0:00.05 -bash

有什么方法可以在 CMD 列中获得更具描述性的内容吗?我需要只跟踪日志文件中的 PID 吗?或者还有别的选择吗?

背景

我正在做一些批处理,其中一些作业可以 运行 几个小时。我需要能够 运行 并行处理其中一些工作以节省时间。在我可以 运行 另一个依赖于它们的工作之前,所有这些并行工作都需要成功完成。但是,如果一项工作行为不端,我希望能够在让其他工作完成的同时杀死它……这就是我有一份工作的地方,然后是并行工作,然后是顺序的更多工作,然后是更多的并行工作...

示例代码

这是一些虚拟代码,概述了我正在尝试做的事情的概念。

#!/usr/bin/env python
import time
import multiprocessing

def open_zoo_cages():
    print('Opening zoo cages...')

def crossing_road(animal, sleep_time):
    print('An ' + animal + ' is crossing the road')
    for i in range(5):
        print("It's a wide road for " + animal + " to cross...")
        time.sleep(sleep_time)

    print('The ' + animal + ' is across.')

def aardvark():
    crossing_road('aardvark', 2)

def badger():
    crossing_road('badger', 4)

def cougar():
    crossing_road('cougar', 3)

def clean_the_road():
    print('Cleaning off the road of animal droppings...')

def print_exit_code(process):
    print(process.name + " exit code: " + str(process.exitcode))

def main():
    # Run a single job that must finish before running some jobs in parallel
    open_zoo_cages()

    # Run some jobs in parallel
    amos = multiprocessing.Process(name='aardvark Amos', target=aardvark)
    betty = multiprocessing.Process(name='badger Betty', target=badger)
    carl = multiprocessing.Process(name='cougar Carl', target=cougar)

    amos.start()
    betty.start()
    carl.start()

    amos.join()
    betty.join()
    carl.join()

    print_exit_code(amos)
    print_exit_code(betty)
    print_exit_code(carl)

    # Run another job (clean_the_road) if all the parallel jobs finished in 
    # success. Otherwise end in error.
    if amos.exitcode == 0 and betty.exitcode == 0 and carl.exitcode == 0:
        clean_the_road()
    else:
        sys.exit('Not all animals finished crossing')

if __name__ == '__main__':
    main()

此外,我注意到将其中一个函数放在另一个 Python 模块中不会改变相关进程的 ps 命令列中的内容。

输出

fermion:workspace ross$ ./parallel_jobs.py 
Opening zoo cages...
An aardvark is crossing the road
It's a wide road for aardvark to cross...
An badger is crossing the road
It's a wide road for badger to cross...
An cougar is crossing the road
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for badger to cross...
It's a wide road for cougar to cross...
It's a wide road for aardvark to cross...
It's a wide road for badger to cross...
It's a wide road for aardvark to cross...
It's a wide road for cougar to cross...
The aardvark is across.
It's a wide road for badger to cross...
It's a wide road for cougar to cross...
The cougar is across.
It's a wide road for badger to cross...
The badger is across.
aardvark Amos exit code: 0
badger Betty exit code: 0
cougar Carl exit code: 0
Cleaning off the road of animal droppings...

很好的简单答案,让每个进程打开一个描述文件句柄,然后使用 lsof。

f = open('/tmp/hippo.txt','w')

这将为您提供进程的 pid

lsof | grep "hippo"

这不是最 pythonic 的答案,但那又怎样:)

我最初的回答是简单的方法,这里是一个更大概念的不完整的小例子,将信号处理程序添加到 class 被称为子进程,允许你发出类似 kill -6 的东西。 .. 转储信息 ....您甚至可以使用它来按需转储给定子流程中剩余处理量的进度,

import signal

class Foo():
    def __init__(self, name):
        self.myname = name
        signal.signal(signal.SIGTERM, self.my_callback)
        self.myqueue = Queue.Queue()

    def my_callback(self):
        logging.error("%s %s %s", self.myname, psutil.blah_getmypid(), len(self.myqueue))         

或者你可以这样做,我认为这可能是你真正想要的:

import multiprocessing
import time
def foo():
    time.sleep(60)
if __name__ == "__main__":
    process = [
        multiprocessing.Process(name="a",target=foo),
        multiprocessing.Process(name="b",target=foo),
        multiprocessing.Process(name="c",target=foo),
    ]
    for p in process:
        p.start()
    for p in process:
        print(p.name, p.pid)
    for p in process:
        p.join()

Psutil 库可以满足您的需求,并且被广泛使用。您可以了解 psutil 程序员是如何做的,或者在您的项目中自己使用该库。

https://pypi.python.org/pypi/psutil