Python3 asyncio - add_done_callback 的回调不更新服务器 class 中的自身变量

Python3 asyncio - callback for add_done_callback do not updates self variable in server class

我有两个服务器,使用 asyncio.start_server 创建: asyncio.start_server(self.handle_connection, host = host, port = port) 和 运行 在一个循环中:

loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()

我正在使用 asyncio.Queue 在服务器之间进行通信。来自服务器 2 的消息,通过 queue.put(msg) 添加,由服务器 1 中的 queue.get() 成功接收。我是 运行 queue.get() by asyncio.ensure_future 并用作回调 add_done_callback 来自 Server1 的方法:

def callback(self, future):
    msg = future.result()
    self.msg = msg

但是这个 callback 没有按预期工作 - self.msg 不更新。我做错了什么?

已更新 使用附加代码显示最大完整示例:

class Queue(object):

    def __init__(self, loop, maxsize: int):
        self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)

    async def put(self, data):
        await self.instance.put(data)

    async def get(self):
        data = await self.instance.get()
        self.instance.task_done()
        return data

    @staticmethod
    def get_instance():
        return Queue(loop = asyncio.get_event_loop(), maxsize = 10)

服务器class:

    class BaseServer(object):

        def __init__(self, host, port):
            self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)

        async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
            pass

        def get_instance(self):
            return self.instance

        @staticmethod
        def create():
            return BaseServer(None, None)

接下来我是 运行 服务器:

loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()

我在 server2 的 handle_connection 调用 queue.put(msg),在 server1 的 handle_connection 我注册 queue.get() 为任务:

 task_queue = asyncio.ensure_future(queue.get())
 task_queue.add_done_callback(self.process_queue)

服务器1的process_queue方法:

    def process_queue(self, future):
        msg = future.result()
        self.msg = msg

server1的handle_connection方法:

 async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
     task_queue = asyncio.ensure_future(queue.get())
     task_queue.add_done_callback(self.process_queue)

     while self.msg != SPECIAL_VALUE:
        # doing something

尽管 task_queue 完成,self.process_queue 调用,self.msg 永远不会更新。

基本上你使用的是异步结构,我觉得你可以直接等待结果:

async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
    msg = await queue.get()
    process_queue(msg)  # change it to accept real value instead of a future.
    # do something