运行 主异步应用程序中不同进程中的多个异步循环

Run multiple async loops in separate processes within a main async app

好吧,这有点令人费解,但我有一个带有很多异步代码的异步 class。

我希望在 class 中并行化一个任务,我想为 运行 一个阻塞任务生成多个进程,并且在每个进程中我想创建一个 asyncio循环处理各种子任务。

所以我没能用 ThreadPollExecutor 做到这一点,但是当我尝试使用 ProcessPoolExecutor 时,我得到一个 Can't pickle local object error.

这是我的代码的简化版本,运行使用 ThreadPoolExecutor。这如何与 ProcessPoolExecutor 并行化?

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor


class MyClass:
    def __init__(self) -> None:
        self.event_loop = None
        # self.pool_executor = ProcessPoolExecutor(max_workers=8)
        self.pool_executor = ThreadPoolExecutor(max_workers=8)
        self.words = ["one", "two", "three", "four", "five"]
        self.multiplier = int(2)

async def subtask(self, letter: str):
    await asyncio.sleep(1)
    return letter * self.multiplier

async def task_gatherer(self, subtasks: list):
    return await asyncio.gather(*subtasks)

def blocking_task(self, word: str):
    time.sleep(1)
    subtasks = [self.subtask(letter) for letter in word]
    result = asyncio.run(self.task_gatherer(subtasks))
    return result

async def master_method(self):
    self.event_loop = asyncio.get_running_loop()
    master_tasks = [
        self.event_loop.run_in_executor(
            self.pool_executor,
            self.blocking_task,
            word,
        )
        for word in self.words
    ]

    results = await asyncio.gather(*master_tasks)
    print(results)


if __name__ == "__main__":
    my_class = MyClass()
    asyncio.run(my_class.master_method())

这是个很好的问题。问题和解决方案都很有趣。

问题

多线程和多处理之间的一个区别是内存的处理方式。线程 共享 内存 space。进程没有(一般情况下,见下文)。

对象仅通过引用传递给 ThreadPoolExecutor。无需创建新对象。

但是 ProcessPoolExecutor 存在于单独的内存中 space。为了将对象传递给它,实现对对象进行 pickle 并在另一侧再次对它们进行 unpickle。这个细节通常很重要。

仔细查看原问题中 blocking_task 的参数。我不是说 word - 我是说第一个参数:self。永远在那里的那个。我们已经看过一百万次了,却几乎没有想过它。要执行函数 blocking_task,名为“self”的参数需要一个值。要 运行 ProcessPoolExecutor 中的此函数,必须对“self”进行 pickled 和 unpickled。现在看看“self”的一些成员对象:有一个事件循环,还有执行者本身。两者都不可腌制。就是这个问题。

我们无法 运行 在另一个进程中按原样运行。

不可否认,回溯消息“无法腌制本地对象”还有很多不足之处。文档也是如此。但实际上,该程序使用 ThreadPool 而不是 ProcessPool 是完全有道理的。

注意:存在在进程之间共享 ctypes 对象的机制。但是,据我所知,没有办法直接共享 Python 对象。这就是使用 pickle/unpickle 机制的原因。

解决方案

重构 MyClass 以将数据与多处理框架分开。我创建了第二个 class,MyTask,它可以被 pickled 和 unpickled。我将一些函数从 MyClass 移到了其中。原始列表中没有任何重要内容被修改 - 只是重新排列。

脚本 运行 使用 ProcessPoolExecutor 和 ThreadPoolExecutor 成功。

import asyncio
import time
# from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import ProcessPoolExecutor

# Refactored MyClass to break out MyTask

class MyTask:
    def __init__(self):
        self.multiplier = 2

    async def subtask(self, letter: str):
        await asyncio.sleep(1)
        return letter * self.multiplier
    
    async def task_gatherer(self, subtasks: list):
        return await asyncio.gather(*subtasks)
    
    def blocking_task(self, word: str):
        time.sleep(1)
        subtasks = [self.subtask(letter) for letter in word]
        result = asyncio.run(self.task_gatherer(subtasks))
        return result
    
class MyClass:
    def __init__(self):
        self.task = MyTask()
        self.event_loop: asyncio.AbstractEventLoop = None
        self.pool_executor = ProcessPoolExecutor(max_workers=8)
        # self.pool_executor = ThreadPoolExecutor(max_workers=8)
        self.words = ["one", "two", "three", "four", "five"]

    async def master_method(self):
        self.event_loop = asyncio.get_running_loop()
        master_tasks = [
            self.event_loop.run_in_executor(
                self.pool_executor,
                self.task.blocking_task,
                word,
            )
            for word in self.words
        ]
    
        results = await asyncio.gather(*master_tasks)
        print(results)

if __name__ == "__main__":
    my_class = MyClass()
    asyncio.run(my_class.master_method())