Python 多重处理
Python MultiProcessing
我正在为 RabbitMQ 消费者使用 Python Python 多处理。
在应用程序启动时,我创建了 4 个 WorkerProcesses。
def start_workers(num=4):
for i in xrange(num):
process = WorkerProcess()
process.start()
下面是我的 WorkerClass。
逻辑到目前为止有效,我创建了 4 个并行的消费者进程。
但问题是在进程被杀死之后。我想创建一个新流程。下面逻辑中的问题是新进程是作为旧进程的子进程创建的,一段时间后内存用完 space。
Python Multiprocessing 是否有可能启动一个新进程并正确终止旧进程?
class WorkerProcess(multiprocessing.Process):
def ___init__(self):
app.logger.info('%s: Starting new Thread!', self.name)
super(multiprocessing.Process, self).__init__()
def shutdown(self):
process = WorkerProcess()
process.start()
return True
def kill(self):
start_workers(1)
self.terminate()
def run(self):
try:
# Connect to RabbitMQ
credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
channel = connection.channel()
# Declare the Queue
channel.queue_declare(queue='screenshotlayer',
auto_delete=False,
durable=True)
app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='screenshotlayer')
channel.start_consuming()
app.logger.info('%s: Thread is going to sleep!', self.name)
# do what channel.start_consuming() does but with stoppping signal
#while self.stop_working.is_set():
# channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
except Exception as e:
self.shutdown()
return 0
谢谢
在主流程中,跟踪您的子流程(在 list
中)并使用 .join(timeout=50)
(https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join) 循环它们。
然后检查他是否还活着 (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.is_alive)。
如果他不在,请换一个新的。
def start_workers(n):
wks = []
for _ in range(n):
wks.append(WorkerProcess())
wks[-1].start()
while True:
#Remove all terminated process
wks = [p for p in wks if p.is_alive()]
#Start new process
for i in range(n-len(wks)):
wks.append(WorkerProcess())
wks[-1].start()
我不会自己处理进程池管理。相反,我会使用 concurrent.future
模块中的 ProcessPoolExecutor
。
无需继承WorkerProcess
即可继承Process
class。只需在 class 中编写您的实际代码,然后将其提交给进程池执行程序。执行者将拥有一个随时准备执行您的任务的进程池。
这样您就可以让事情变得简单并且不会让您头疼。
您可以在我的博客 post 中阅读更多相关信息:http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html
示例代码:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())
我正在为 RabbitMQ 消费者使用 Python Python 多处理。 在应用程序启动时,我创建了 4 个 WorkerProcesses。
def start_workers(num=4):
for i in xrange(num):
process = WorkerProcess()
process.start()
下面是我的 WorkerClass。 逻辑到目前为止有效,我创建了 4 个并行的消费者进程。 但问题是在进程被杀死之后。我想创建一个新流程。下面逻辑中的问题是新进程是作为旧进程的子进程创建的,一段时间后内存用完 space。 Python Multiprocessing 是否有可能启动一个新进程并正确终止旧进程?
class WorkerProcess(multiprocessing.Process):
def ___init__(self):
app.logger.info('%s: Starting new Thread!', self.name)
super(multiprocessing.Process, self).__init__()
def shutdown(self):
process = WorkerProcess()
process.start()
return True
def kill(self):
start_workers(1)
self.terminate()
def run(self):
try:
# Connect to RabbitMQ
credentials = pika.PlainCredentials(app.config.get('RABBIT_USER'), app.config.get('RABBIT_PASS'))
connection = pika.BlockingConnection(
pika.ConnectionParameters(host=app.config.get('RABBITMQ_SERVER'), port=5672, credentials=credentials))
channel = connection.channel()
# Declare the Queue
channel.queue_declare(queue='screenshotlayer',
auto_delete=False,
durable=True)
app.logger.info('%s: Start to consume from RabbitMQ.', self.name)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='screenshotlayer')
channel.start_consuming()
app.logger.info('%s: Thread is going to sleep!', self.name)
# do what channel.start_consuming() does but with stoppping signal
#while self.stop_working.is_set():
# channel.transport.connection.process_data_events()
channel.stop_consuming()
connection.close()
except Exception as e:
self.shutdown()
return 0
谢谢
在主流程中,跟踪您的子流程(在 list
中)并使用 .join(timeout=50)
(https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.join) 循环它们。
然后检查他是否还活着 (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Process.is_alive)。
如果他不在,请换一个新的。
def start_workers(n):
wks = []
for _ in range(n):
wks.append(WorkerProcess())
wks[-1].start()
while True:
#Remove all terminated process
wks = [p for p in wks if p.is_alive()]
#Start new process
for i in range(n-len(wks)):
wks.append(WorkerProcess())
wks[-1].start()
我不会自己处理进程池管理。相反,我会使用 concurrent.future
模块中的 ProcessPoolExecutor
。
无需继承WorkerProcess
即可继承Process
class。只需在 class 中编写您的实际代码,然后将其提交给进程池执行程序。执行者将拥有一个随时准备执行您的任务的进程池。
这样您就可以让事情变得简单并且不会让您头疼。
您可以在我的博客 post 中阅读更多相关信息:http://masnun.com/2016/03/29/python-a-quick-introduction-to-the-concurrent-futures-module.html
示例代码:
from concurrent.futures import ProcessPoolExecutor
from time import sleep
def return_after_5_secs(message):
sleep(5)
return message
pool = ProcessPoolExecutor(3)
future = pool.submit(return_after_5_secs, ("hello"))
print(future.done())
sleep(5)
print(future.done())
print("Result: " + future.result())