Python - 来自 RabbitMQ 消费者的异步多处理
Python - Asynch Multiprocessing from RabbitMQ Consumer
我有一个 Python 程序充当 RabbitMQ 的消费者。一旦它从队列中收到作业,我希望程序使用多处理将作业拆分,但我 运行 遇到多处理后勤问题。
我简化了代码以提高可读性。
我的 RabbitMQ 消费者功能:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')
def callback(ch, method, properties, body):
job_info = json.loads(body)
logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
split_jobs = split_job(job_info)
process_manager.runProcesses(split_jobs)
ch.basic_ack(delivery_tag=method.delivery_tag)
我的多处理功能:
#!/usr/bin/python
import multiprocessing
import other_package
def worker_process(sub_job):
other_package.run_job(sub_job)
def runProcesses(jobs):
processes = []
for sub_job in jobs:
p = multiprocessing.Process(target=worker_process, args=(sub_job,))
processes.append(p)
p.start()
当然,我做不到if __name__ == '__main__':
,因为它在一个函数中。
我不确定多处理是否有解决此问题的方法,或者我是否只是以错误的方式处理此问题。任何帮助将不胜感激。
您可以重构 multiprocessing
部分,以便从主脚本初始化其状态:
import process_manager
...
def callback(ch, method, properties, body):
job_info = json.loads(body)
logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
split_jobs = split_job(job_info)
manager.runProcesses(split_jobs)
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
manager = process_manager.get_manager()
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')
然后 process_manager
看起来像这样:
import multiprocessing
import other_package
def worker_process(sub_job):
other_package.run_job(sub_job)
_manager = None
def get_manager(): # Note that you don't have to use a singleton here
global _manager
if not _manager:
_manager = Manager()
return _manager
class Manager(object):
def __init__(self):
self._pool = multiprocessing.Pool()
def runProcesses(self, jobs):
self._pool.map_async(worker_process, jobs)
请注意,我使用 Pool
而不是为每个作业生成 Process
,因为这可能无法很好地扩展。
我有一个 Python 程序充当 RabbitMQ 的消费者。一旦它从队列中收到作业,我希望程序使用多处理将作业拆分,但我 运行 遇到多处理后勤问题。
我简化了代码以提高可读性。
我的 RabbitMQ 消费者功能:
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')
def callback(ch, method, properties, body):
job_info = json.loads(body)
logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
split_jobs = split_job(job_info)
process_manager.runProcesses(split_jobs)
ch.basic_ack(delivery_tag=method.delivery_tag)
我的多处理功能:
#!/usr/bin/python
import multiprocessing
import other_package
def worker_process(sub_job):
other_package.run_job(sub_job)
def runProcesses(jobs):
processes = []
for sub_job in jobs:
p = multiprocessing.Process(target=worker_process, args=(sub_job,))
processes.append(p)
p.start()
当然,我做不到if __name__ == '__main__':
,因为它在一个函数中。
我不确定多处理是否有解决此问题的方法,或者我是否只是以错误的方式处理此问题。任何帮助将不胜感激。
您可以重构 multiprocessing
部分,以便从主脚本初始化其状态:
import process_manager
...
def callback(ch, method, properties, body):
job_info = json.loads(body)
logging.info('Start Time: ' + time.strftime("%H:%M:%S"))
split_jobs = split_job(job_info)
manager.runProcesses(split_jobs)
ch.basic_ack(delivery_tag=method.delivery_tag)
if __name__ == "__main__":
manager = process_manager.get_manager()
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue="JobReader", durable=True)
logging.info('Waiting for messages..')
然后 process_manager
看起来像这样:
import multiprocessing
import other_package
def worker_process(sub_job):
other_package.run_job(sub_job)
_manager = None
def get_manager(): # Note that you don't have to use a singleton here
global _manager
if not _manager:
_manager = Manager()
return _manager
class Manager(object):
def __init__(self):
self._pool = multiprocessing.Pool()
def runProcesses(self, jobs):
self._pool.map_async(worker_process, jobs)
请注意,我使用 Pool
而不是为每个作业生成 Process
,因为这可能无法很好地扩展。