如何在异步函数中使用多进程? returns 我认为 freeze_support() 被省略了

How to use multiprocess inside an asynchronous function? It returns me that freeze_support() has been omitted

我有一个程序使用了很多对其一般操作很重要的异步函数,但我需要其中一个函数同时执行 2 个进程。 为此,我决定使用多进程,创建并启动进程 p1 和进程 p2。 p1 将是一种计时 8 秒的计时器,这将是进程 p2 必须完成您的任务的时间限制,但是如果 p2 在 8 秒之前未完成其任务(即 p1 在 p2 之前完成)然后两个进程都关闭,程序继续。

这里p2是一个对字符串进行运算的进程,对名为text的变量的内容进行编码,完成后返回给主程序。另一方面,p1是一个进程,其目标是限制进程p2必须执行字符串操作并将结果加载到文本变量中的时间。

这个算法的objective是为了避免可能的情况,即如果用户发送了超长的操作(或者在可能的情况下系统已经饱和以在合理的时间内处理该信息)超过 8 秒,然后 p2 将停止执行它们并且变量文本的内容不会改变。

这是我的简化代码,我尝试创建 2 个线程(称为 p1 和 p2),每个线程执行不同的函数,但两个函数都在同一个 class 中,我称之为 class 来自异步函数:

import asyncio, multiprocessing, time

#class BotBrain(BotModule):
class BotBrain:
    # real  __init__()
    '''
    def __init__(self, chatbot: object) -> None:
        self.chatbot = chatbot
        self.max_learn_range = 4
        corpus_name = self.chatbot.config["CorpusName"]   
        self.train(corpus_name)
    '''
    # test  __init__()
    def __init__(self):
        self.finish_state = multiprocessing.Event()


    def operation_process(self, input_text, text):
        #text = name_identificator(input_text, text)
        text = "aaa" #for this example I change the info directly without detailing the nlp process
        return input_text, text

    def finish_process(self, finish_state):
        time.sleep(8)
        print("the process has been interrupted!")
        self.finish_state.set()

    def process_control(self, finish_state, input_text, text):
        p1 = multiprocessing.Process(target=self.finish_process, args=(finish_state,))
        p1.start()
        p2 = multiprocessing.Process(target=self.operation_process, args=(input_text, text,))
        p2.start()

        #close all unfinished processes, p1 or p2
        finish_state.wait()
        for process in [p1, p2]:
            process.terminate()

    def process(self, **kwargs) -> str:
        input_text, text = "Hello, how are you?", ""
        text = self.process_control(self.finish_state, input_text, text) #for multiprocessing code



async def main_call():
    testInstance = BotBrain() #here execute the instructions of the __init__() function
    testInstance.process()

asyncio.run(main_call())

但它给我这些错误,我不知道如何解决它们:

  File "J:\async_and_multiprocess.py", line 46, in process_control
    p1.start()
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\context.py", line 327, in _Popen
    return Popen(process_obj)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    return Popen(process_obj)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\popen_spawn_win32.py", line 45, in __init__
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    prep_data = spawn.get_preparation_data(process_obj._name)
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 154, in get_preparation_data
    _check_not_importing_main()
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.
    _check_not_importing_main()
  File "C:\Users\MIPC\anaconda3\lib\multiprocessing\spawn.py", line 134, in _check_not_importing_main
    raise RuntimeError('''
RuntimeError:
        An attempt has been made to start a new process before the
        current process has finished its bootstrapping phase.

        This probably means that you are not using fork to start your
        child processes and you have forgotten to use the proper idiom
        in the main module:

            if __name__ == '__main__':
                freeze_support()
                ...

        The "freeze_support()" line can be omitted if the program
        is not going to be frozen to produce an executable.

freeze_support()是什么意思?我应该把它放在 class 的什么地方?

这是我的程序应该如何工作的流程图,虽然它没有考虑 freeze_support() 因为我不知道你的意思。

多处理可能很棘手,部分原因在于 Python 的导入系统。来自多处理模块的programming guidelines

Make sure that the main module can be safely imported by a new Python interpreter without causing unintended side effects (such a starting a new process).

这就是这里发生的事情。导入主模块会执行 asyncio.run(main_call()),这又会产生另一个子进程,依此类推。

您需要防止您的 asyncio.run 在导入时被执行:

if __name__ == "__main__":
    asyncio.run(main_call())

如果您不想生成冻结的二进制文件(例如使用 pyinstaller),您可以忽略带有 freeze_support().

的部分