Python3 asyncio - add_done_callback 的回调不更新服务器 class 中的自身变量
Python3 asyncio - callback for add_done_callback do not updates self variable in server class
我有两个服务器,使用 asyncio.start_server 创建:
asyncio.start_server(self.handle_connection, host = host, port = port)
和 运行 在一个循环中:
loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()
我正在使用 asyncio.Queue 在服务器之间进行通信。来自服务器 2 的消息,通过 queue.put(msg)
添加,由服务器 1 中的 queue.get()
成功接收。我是 运行 queue.get()
by asyncio.ensure_future
并用作回调
add_done_callback
来自 Server1 的方法:
def callback(self, future):
msg = future.result()
self.msg = msg
但是这个 callback
没有按预期工作 - self.msg 不更新。我做错了什么?
已更新
使用附加代码显示最大完整示例:
class Queue(object):
def __init__(self, loop, maxsize: int):
self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)
async def put(self, data):
await self.instance.put(data)
async def get(self):
data = await self.instance.get()
self.instance.task_done()
return data
@staticmethod
def get_instance():
return Queue(loop = asyncio.get_event_loop(), maxsize = 10)
服务器class:
class BaseServer(object):
def __init__(self, host, port):
self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
pass
def get_instance(self):
return self.instance
@staticmethod
def create():
return BaseServer(None, None)
接下来我是 运行 服务器:
loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()
我在 server2 的 handle_connection
调用 queue.put(msg)
,在 server1 的 handle_connection
我注册 queue.get()
为任务:
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
服务器1的process_queue
方法:
def process_queue(self, future):
msg = future.result()
self.msg = msg
server1的handle_connection
方法:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
while self.msg != SPECIAL_VALUE:
# doing something
尽管 task_queue
完成,self.process_queue
调用,self.msg
永远不会更新。
基本上你使用的是异步结构,我觉得你可以直接等待结果:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
msg = await queue.get()
process_queue(msg) # change it to accept real value instead of a future.
# do something
我有两个服务器,使用 asyncio.start_server 创建:
asyncio.start_server(self.handle_connection, host = host, port = port)
和 运行 在一个循环中:
loop.run_until_complete(asyncio.gather(server1, server2))
loop.run_forever()
我正在使用 asyncio.Queue 在服务器之间进行通信。来自服务器 2 的消息,通过 queue.put(msg)
添加,由服务器 1 中的 queue.get()
成功接收。我是 运行 queue.get()
by asyncio.ensure_future
并用作回调
add_done_callback
来自 Server1 的方法:
def callback(self, future):
msg = future.result()
self.msg = msg
但是这个 callback
没有按预期工作 - self.msg 不更新。我做错了什么?
已更新 使用附加代码显示最大完整示例:
class Queue(object):
def __init__(self, loop, maxsize: int):
self.instance = asyncio.Queue(loop = loop, maxsize = maxsize)
async def put(self, data):
await self.instance.put(data)
async def get(self):
data = await self.instance.get()
self.instance.task_done()
return data
@staticmethod
def get_instance():
return Queue(loop = asyncio.get_event_loop(), maxsize = 10)
服务器class:
class BaseServer(object):
def __init__(self, host, port):
self.instance = asyncio.start_server(self.handle_connection, host = host, port = port)
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
pass
def get_instance(self):
return self.instance
@staticmethod
def create():
return BaseServer(None, None)
接下来我是 运行 服务器:
loop.run_until_complete(asyncio.gather(server1.get_instance(), server2.get_instance()))
loop.run_forever()
我在 server2 的 handle_connection
调用 queue.put(msg)
,在 server1 的 handle_connection
我注册 queue.get()
为任务:
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
服务器1的process_queue
方法:
def process_queue(self, future):
msg = future.result()
self.msg = msg
server1的handle_connection
方法:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
task_queue = asyncio.ensure_future(queue.get())
task_queue.add_done_callback(self.process_queue)
while self.msg != SPECIAL_VALUE:
# doing something
尽管 task_queue
完成,self.process_queue
调用,self.msg
永远不会更新。
基本上你使用的是异步结构,我觉得你可以直接等待结果:
async def handle_connection(self, reader: StreamReader, writer: StreamWriter):
msg = await queue.get()
process_queue(msg) # change it to accept real value instead of a future.
# do something