python3 异步等待回调
python3 asyncio await on callback
我正在尝试弄清楚如何使用 rospy actionlib 和 asyncio 来
异步等待操作结果。为此,我尝试了
写一个动作执行生成器,但现在还没有成功。
我的想法是在动作的 done_callback
中添加一个 asyncio future
但未来似乎永远不会结束。
代码在这里:
def _generate_action_executor(self, action):
async def run_action(goal):
action_done = asyncio.Future()
def done_callback(goal_status, result, future):
status = ActionLibGoalStatus(goal_status)
print('Action Done: {}'.format(status))
future.set_result(result)
action.send_goal(goal,
lambda x, y: done_callback(x,
y,
action_done))
try:
result = await action_done
#problem future never done
except asyncio.CancelledError as exc:
action.cancel()
raise exc
return result
return run_action
async def do_some_other_stuff(action):
//do stuff
my_goal = MyActionRequest('just do it')
run_action = self._generate_action_executor(action)
response = await run_action(my_goal)
return response
if __name__ == "__main__":
action = actionlib.SimpleActionClient('my_action',
MyAction)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_other_stuff(action))
finally:
loop.close()
请记住,asyncio 旨在 运行 在单个线程中。
如果程序需要与其他线程交互,您将不得不使用其中一个专用函数:
这是一个简化的例子:
async def run_with_non_asyncio_lib(action, arg):
future = asyncio.Future()
loop = asyncio.get_event_loop()
def callback(*args):
loop.call_soon_threasafe(future.set_result, args)
non_asyncio_lib.register_callback(action, arg, callback)
callback_args = await future
return process(*callback_args)
或者,loop.run_in_executor 提供了一种与非异步库交互的方法,方法是 运行在它自己的线程中使用给定的函数:
async def run_with_non_asyncio_lib(action, arg):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, non_asyncio_lib.run, action, arg)
result = await future
return result
有了文森特的想法,
我确实找到了解决问题的方法:
def _generate_action_executor(action):
async def run_action(goal):
loop = asyncio.get_event_loop()
action_done = loop.create_future()
def done_callback(goal_status, result, future, loop):
status = ActionLibGoalStatus(goal_status)
print('Action Done: {}'.format(status))
loop.call_soon_threadsafe(future.set_result(result))
action.send_goal(goal,partial(done_callback, future=action_done, loop=loop))
try:
await action_done
except asyncio.CancelledError as exc:
action.cancel()
raise exc
return action_done.result()
return run_action
如果有人知道如何以更智能的方式实现它,请
在使用中分享这些知识。
最佳曼努埃尔
一个完整的测试代码(供需要的用户)
// 基于@user3851038 的回答和
// asyncio.wait_for() src 实现
import asyncio
# from asyncio import exceptions
from datetime import datetime
# wait a callback called
async def test_wait_callback():
loop = asyncio.get_event_loop()
waiter = loop.create_future()
ret_code = None
def callback():
nonlocal ret_code
ret_code = 123
print(datetime.now(), 'callback called:', ret_code)
if not waiter.done(): waiter.set_result(None) # <--- tell done
print(datetime.now(), 'do_stuff_with_callback call')
await do_stuff_with_callback(callback)
print(datetime.now(), 'do_stuff_with_callback return')
try:
ret = await waiter # <--- wait done
print(datetime.now(), 'callback done :', ret_code)
except asyncio.exceptions.CancelledError:
pass
# run something in main loop (not current task), with callback
async def do_stuff_with_callback(cb):
async def task():
await asyncio.sleep(2) # 2 sec
if callable(cb): cb()
loop = asyncio.get_event_loop()
loop.create_task(task())
if __name__ == '__main__':
asyncio.run(test_wait_callback())
# loop = asyncio.get_event_loop()
# loop.run_until_complete(test_wait_callback())
输出:
2021-12-04 18:09:36.400958 do_stuff_with_callback call
2021-12-04 18:09:36.400996 do_stuff_with_callback return
2021-12-04 18:09:38.401790 callback called: 123
2021-12-04 18:09:38.401974 callback done : 123
我正在尝试弄清楚如何使用 rospy actionlib 和 asyncio 来
异步等待操作结果。为此,我尝试了
写一个动作执行生成器,但现在还没有成功。
我的想法是在动作的 done_callback
中添加一个 asyncio future
但未来似乎永远不会结束。
代码在这里:
def _generate_action_executor(self, action):
async def run_action(goal):
action_done = asyncio.Future()
def done_callback(goal_status, result, future):
status = ActionLibGoalStatus(goal_status)
print('Action Done: {}'.format(status))
future.set_result(result)
action.send_goal(goal,
lambda x, y: done_callback(x,
y,
action_done))
try:
result = await action_done
#problem future never done
except asyncio.CancelledError as exc:
action.cancel()
raise exc
return result
return run_action
async def do_some_other_stuff(action):
//do stuff
my_goal = MyActionRequest('just do it')
run_action = self._generate_action_executor(action)
response = await run_action(my_goal)
return response
if __name__ == "__main__":
action = actionlib.SimpleActionClient('my_action',
MyAction)
try:
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_other_stuff(action))
finally:
loop.close()
请记住,asyncio 旨在 运行 在单个线程中。
如果程序需要与其他线程交互,您将不得不使用其中一个专用函数:
这是一个简化的例子:
async def run_with_non_asyncio_lib(action, arg):
future = asyncio.Future()
loop = asyncio.get_event_loop()
def callback(*args):
loop.call_soon_threasafe(future.set_result, args)
non_asyncio_lib.register_callback(action, arg, callback)
callback_args = await future
return process(*callback_args)
或者,loop.run_in_executor 提供了一种与非异步库交互的方法,方法是 运行在它自己的线程中使用给定的函数:
async def run_with_non_asyncio_lib(action, arg):
loop = asyncio.get_event_loop()
future = loop.run_in_executor(None, non_asyncio_lib.run, action, arg)
result = await future
return result
有了文森特的想法,
我确实找到了解决问题的方法:
def _generate_action_executor(action):
async def run_action(goal):
loop = asyncio.get_event_loop()
action_done = loop.create_future()
def done_callback(goal_status, result, future, loop):
status = ActionLibGoalStatus(goal_status)
print('Action Done: {}'.format(status))
loop.call_soon_threadsafe(future.set_result(result))
action.send_goal(goal,partial(done_callback, future=action_done, loop=loop))
try:
await action_done
except asyncio.CancelledError as exc:
action.cancel()
raise exc
return action_done.result()
return run_action
如果有人知道如何以更智能的方式实现它,请 在使用中分享这些知识。
最佳曼努埃尔
一个完整的测试代码(供需要的用户)
// 基于@user3851038 的回答和
// asyncio.wait_for() src 实现
import asyncio
# from asyncio import exceptions
from datetime import datetime
# wait a callback called
async def test_wait_callback():
loop = asyncio.get_event_loop()
waiter = loop.create_future()
ret_code = None
def callback():
nonlocal ret_code
ret_code = 123
print(datetime.now(), 'callback called:', ret_code)
if not waiter.done(): waiter.set_result(None) # <--- tell done
print(datetime.now(), 'do_stuff_with_callback call')
await do_stuff_with_callback(callback)
print(datetime.now(), 'do_stuff_with_callback return')
try:
ret = await waiter # <--- wait done
print(datetime.now(), 'callback done :', ret_code)
except asyncio.exceptions.CancelledError:
pass
# run something in main loop (not current task), with callback
async def do_stuff_with_callback(cb):
async def task():
await asyncio.sleep(2) # 2 sec
if callable(cb): cb()
loop = asyncio.get_event_loop()
loop.create_task(task())
if __name__ == '__main__':
asyncio.run(test_wait_callback())
# loop = asyncio.get_event_loop()
# loop.run_until_complete(test_wait_callback())
输出:
2021-12-04 18:09:36.400958 do_stuff_with_callback call
2021-12-04 18:09:36.400996 do_stuff_with_callback return
2021-12-04 18:09:38.401790 callback called: 123
2021-12-04 18:09:38.401974 callback done : 123