如何在 django 命令中使用多进程?

How to use multiprocess in django command?

我想在 django 命令中使用 ProcessPoolExecutor 来同时获得一些结果。我尝试使用以下代码来获取它

# main codes


import json
import time
import datetime
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


from redis import Redis
from django.conf import settings
from django.core.management.base import BaseCommand
from thrift.transport import TSocket, TTransport
from thrift.protocol import TBinaryProtocol

from utils.cache import pool
from services.analysis.thrift.Analysis import Client, Dashparam
from api.analysis.models import MDashBoard


redis_con = Redis(connection_pool=pool)

class AnalysisThriftService(object):

    def __init__(self):
        ...

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.transport.close()



class Command(BaseCommand):
    help = 'python manage.py --settings "xxx"'

    def add_arguments(self, parser):
        parser.add_argument('--dashboard_id', type=int, help="ID")

    @staticmethod
    def _handle_with_thrift(dashboard_id):
        try:
            print(dashboard_id)
            with AnalysisThriftService() as thrift_server:
                dashboard_result = ...
                
        except:
            import traceback
            traceback.print_exc()

    def handle(self, *args, **options):
        dashboard_id = options["dashboard_id"]
        if dashboard_id is None:
            dashboard_tables = [dashboard.id for dashboard in MDashBoard.objects.all()]
            with ProcessPoolExecutor(max_workers=5) as executor:
                executor.map(Command._handle_with_thrift, dashboard_tables)

        else:
            ...

但我总是会遇到类似

的错误
Process Process-5:
Process Process-2:
Traceback (most recent call last):
  File "D:\python3\lib\multiprocessing\process.py", line 258, in _bootstrap
    self.run()
  File "D:\python3\lib\multiprocessing\process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "D:\python3\lib\concurrent\futures\process.py", line 169, in _process_worker
    call_item = call_queue.get(block=True)
  File "D:\python3\lib\multiprocessing\queues.py", line 113, in get
    return _ForkingPickler.loads(res)
  File "C:\Users\Domob\Desktop\dev\myapi\analysis\management\commands\dashboard_schedule_task.py", line 15, in <modu
le>
    from api.analysis.models import MDashBoard
  File "C:\Users\Domob\Desktop\dev\myapi\analysis\models.py", line 4, in <module>
    from utils.models import BasicModel, StaticCharField
  File "C:\Users\Domob\Desktop\dev\myapi\utils\models.py", line 9, in <module>
    class BasicModel(models.Model):
  File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\db\models\base.py", line 103, in __new__
    app_config = apps.get_containing_app_config(module)
  File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\apps\registry.py", line 252, in get_containing_ap
p_config
    self.check_apps_ready()
  File "C:\Users\Domob\Desktop\dev\venv_myapi\lib\site-packages\django\apps\registry.py", line 135, in check_apps_ready
    raise AppRegistryNotReady("Apps aren't loaded yet.")
django.core.exceptions.AppRegistryNotReady: Apps aren't loaded yet.

怎样才能得到预期的结果。

非常感谢。

您需要在子进程初始化函数中为子进程设置 Django。

def subprocess_setup():
    django.setup()
    # Could do other things here

# ...

with ProcessPoolExecutor(max_workers=5, initializer=subprocess_setup) as executor:
   

遇到了同样的问题,但按照 的建议以不同的方式解决了它。

我必须明确地将上下文传递给 ProcessPoolExecutor 才能正确处理。 代码看起来像这样

import multiprocessing

fork_context = multiprocessing.get_context('fork')
with ProcessPoolExecutor(max_workers=5, mp_context=fork_context) as executor:
...