Python 多重处理

Python MultiProcessing

我正在为 RabbitMQ 消费者使用 Python Python 多处理。 在应用程序启动时,我创建了 4 个 WorkerProcesses。

def start_workers(num=4):
    for i in xrange(num):
        process = WorkerProcess()
        process.start()

下面是我的 WorkerClass。 逻辑到目前为止有效,我创建了 4 个并行的消费者进程。 但问题是在进程被杀死之后。我想创建一个新流程。下面逻辑中的问题是新进程是作为旧进程的子进程创建的,一段时间后内存用完 space。 Python Multiprocessing 是否有可能启动一个新进程并正确终止旧进程?

class WorkerProcess(multiprocessing.Process):

def ___init__(self):
    app.logger.info('%s: Starting new Thread!', self.name)
    super(multiprocessing.Process, self).__init__()

def shutdown(self):
    process = WorkerProcess()
    process.start()
    return True

def kill(self):
    start_workers(1)
    self.terminate()

def run(self):
    try:
        # Connect to RabbitMQ
        credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
        channel = connection.channel()

        # Declare the Queue
        channel.queue_declare(queue='screenshotlayer',
                              auto_delete=False,
                              durable=True)

        app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
        channel.basic_qos(prefetch_count=1)
        channel.basic_consume(callback, queue='screenshotlayer')
        channel.start_consuming()
        app.logger.info('%s: Thread is going to sleep!', self.name)

        # do what channel.start_consuming() does but with stoppping signal
        #while self.stop_working.is_set():
        #    channel.transport.connection.process_data_events()

        channel.stop_consuming()
        connection.close()
    except Exception as e:
               self.shutdown()
    return 0

谢谢

在主流程中,跟踪您的子流程(在 list 中)并使用 .join(timeout=50) (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join) 循环它们。

然后检查他是否还活着 (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.is_alive)。

如果他不在,请换一个新的。

def start_workers(n):
    wks = []
    for _ in range(n):
        wks.append(WorkerProcess())
        wks[-1].start()
    while True:
        #Remove all terminated process
        wks = [p for p in wks if p.is_alive()]

        #Start new process
        for i in range(n-len(wks)):
            wks.append(WorkerProcess())
            wks[-1].start()

我不会自己处理进程池管理。相反,我会使用 concurrent.future 模块中的 ProcessPoolExecutor

无需继承WorkerProcess即可继承Processclass。只需在 class 中编写您的实际代码,然后将其提交给进程池执行程序。执行者将拥有一个随时准备执行您的任务的进程池。

这样您就可以让事情变得简单并且不会让您头疼。

您可以在我的博客 post 中阅读更多相关信息:http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html

示例代码:

from concurrent.futures import ProcessPoolExecutor
from time import sleep

def return_after_5_secs(message):
    sleep(5)
    return message

pool = ProcessPoolExecutor(3)

future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())