使用 python-asyncio 创建对绑定方法的临时异步计时器回调
Creating a temporary async timer callback to a bound method with python-asyncio
我正在尝试使用 asyncio
的事件循环为绑定的 async
方法创建一种计时器回调。现在的问题是绑定的异步方法不应该持有对实例的强引用,否则后者将永远不会被删除。计时器回调应该只与父实例一样长。
我找到了解决方案,但我认为它不是很好:
import asyncio
import functools
import weakref
class ClassWithTimer:
def __init__(self):
asyncio.ensure_future(
functools.partial(
ClassWithTimer.update, weakref.ref(self)
)()
)
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
if self() is None: break
print("IN update of object " + repr(self()))
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5)
del foo
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
是否有更好、更 pythonic 的方法来做到这一点?计时器回调确实需要异步。
如果没有 asyncio
,weakref.WeakMethod
可能是可行的方法。但是 asyncio.ensure_future
需要一个协程对象,所以在这种情况下它不会起作用。
抱歉,我不确定我是否正确理解了你的问题。这是您正在寻找的解决方案吗?
import asyncio
class ClassWithTimer:
async def __aenter__(self):
self.task = asyncio.ensure_future(self.update())
async def __aexit__(self, *args):
try:
self.task.cancel() # Just cancel updating when we don't need it.
await self.task
except asyncio.CancelledError: # Ignore CancelledError rised by cancelled task.
pass
del self # I think you don't need this in real life: instance should be normally deleted by GC.
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self))
async def run():
async with ClassWithTimer(): # Use context manager to handle when we need updating.
await asyncio.sleep(5)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
输出:
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
deleted ClassWithTimer!
[Finished in 5.2s]
没有上下文管理器的另一种方法:
import asyncio
class ClassWithTimer:
def __init__(self):
self.task = asyncio.ensure_future(self.update())
async def release(self):
try:
self.task.cancel()
await self.task
except asyncio.CancelledError:
pass
del self
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self))
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5)
await foo.release()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
几个月前我遇到了几乎相同的问题,我写了一个装饰器来解决它:
def weakmethod(f):
@property
def get(self):
return f.__get__(weakref.proxy(self))
# self = weakref.proxy(self)
# if hasattr(f, '__get__'):
# raise RuntimeWarning(
# 'weakref may not work unless you implement '
# 'the property protocol carefully by youself!'
# )
# return f.__get__(self)
# if asyncio.iscoroutinefunction(f):
# #Make the returned method a coroutine function, optional
# async def g(*arg, **kwarg):
# return await f(self, *arg, **kwarg)
# else:
# def g(*arg, **kwarg):
# return f(self, *arg, **kwarg)
# return g
# #Still some situations not taken into account?
return get
然后您的代码可以很自然地重写:
class ClassWithTimer:
def __init__(self):
asyncio.ensure_future(self.update())
def __del__(self):
print("deleted ClassWithTimer!")
@weakmethod
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object ", self)
重要:
weakref.proxy
并不妨碍你获取强引用。此外,不保证其行为与原始对象完全相同。
我的实现没有涵盖所有可能性。
Python 3.5 很好地处理循环引用(参见 PEP 442 https://www.python.org/dev/peps/pep-0442/)
我会尽快将超时异步上下文管理器推送到 asyncio 中,请参阅草案:
`
import asyncio
import warnings
class Timeout:
def __init__(self, timeout, *, raise_error=False, loop=None):
self._timeout = timeout
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._raise_error = raise_error
self._task = None
self._cancelled = False
self._cancel_handler = None
async def __aenter__(self):
self._task = asyncio.Task.current_task(loop=loop)
self._cancel_handler = self._loop.call_later(
self._cancel, self._timeout)
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._cancelled:
if self._raise_error:
raise asyncio.TimeoutError
else:
# suppress
return True
else:
self._cancel_handler.cancel()
# raise all other errors
def __del__(self):
if self._task:
# just for preventing improper usage
warnings.warn("Use async with")
def _cancel(self):
self._cancelled = self._task.cancel()
async def long_running_task():
while True:
asyncio.sleep(5)
async def run():
async with Timeout(1):
await long_running_task()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
根据Huazuo Gao和germn的回答,我实现了ensure_weakly_binding_future
,与ensure_future
基本相同,但没有保持对绑定方法实例的强引用。它不会修改整体绑定(就像基于装饰器的解决方案那样)并在删除父实例时正确取消未来:
import asyncio
import weakref
def ensure_weakly_binding_future(method):
class Canceller:
def __call__(self, proxy):
self.future.cancel()
canceller = Canceller()
proxy_object = weakref.proxy(method.__self__, canceller)
weakly_bound_method = method.__func__.__get__(proxy_object)
future = asyncio.ensure_future(weakly_bound_method())
canceller.future = future
class ClassWithTimer:
def __init__(self):
ensure_weakly_binding_future(self.update)
def __del__(self):
print("deleted ClassWithTimer!", flush=True)
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self), flush=True)
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5.5)
del foo
await asyncio.sleep(2.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
我正在尝试使用 asyncio
的事件循环为绑定的 async
方法创建一种计时器回调。现在的问题是绑定的异步方法不应该持有对实例的强引用,否则后者将永远不会被删除。计时器回调应该只与父实例一样长。
我找到了解决方案,但我认为它不是很好:
import asyncio
import functools
import weakref
class ClassWithTimer:
def __init__(self):
asyncio.ensure_future(
functools.partial(
ClassWithTimer.update, weakref.ref(self)
)()
)
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
if self() is None: break
print("IN update of object " + repr(self()))
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5)
del foo
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
是否有更好、更 pythonic 的方法来做到这一点?计时器回调确实需要异步。
如果没有 asyncio
,weakref.WeakMethod
可能是可行的方法。但是 asyncio.ensure_future
需要一个协程对象,所以在这种情况下它不会起作用。
抱歉,我不确定我是否正确理解了你的问题。这是您正在寻找的解决方案吗?
import asyncio
class ClassWithTimer:
async def __aenter__(self):
self.task = asyncio.ensure_future(self.update())
async def __aexit__(self, *args):
try:
self.task.cancel() # Just cancel updating when we don't need it.
await self.task
except asyncio.CancelledError: # Ignore CancelledError rised by cancelled task.
pass
del self # I think you don't need this in real life: instance should be normally deleted by GC.
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self))
async def run():
async with ClassWithTimer(): # Use context manager to handle when we need updating.
await asyncio.sleep(5)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
输出:
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
IN update of object <__main__.ClassWithTimer object at 0x000000468D0FB208>
deleted ClassWithTimer!
[Finished in 5.2s]
没有上下文管理器的另一种方法:
import asyncio
class ClassWithTimer:
def __init__(self):
self.task = asyncio.ensure_future(self.update())
async def release(self):
try:
self.task.cancel()
await self.task
except asyncio.CancelledError:
pass
del self
def __del__(self):
print("deleted ClassWithTimer!")
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self))
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5)
await foo.release()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
几个月前我遇到了几乎相同的问题,我写了一个装饰器来解决它:
def weakmethod(f):
@property
def get(self):
return f.__get__(weakref.proxy(self))
# self = weakref.proxy(self)
# if hasattr(f, '__get__'):
# raise RuntimeWarning(
# 'weakref may not work unless you implement '
# 'the property protocol carefully by youself!'
# )
# return f.__get__(self)
# if asyncio.iscoroutinefunction(f):
# #Make the returned method a coroutine function, optional
# async def g(*arg, **kwarg):
# return await f(self, *arg, **kwarg)
# else:
# def g(*arg, **kwarg):
# return f(self, *arg, **kwarg)
# return g
# #Still some situations not taken into account?
return get
然后您的代码可以很自然地重写:
class ClassWithTimer:
def __init__(self):
asyncio.ensure_future(self.update())
def __del__(self):
print("deleted ClassWithTimer!")
@weakmethod
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object ", self)
重要:
weakref.proxy
并不妨碍你获取强引用。此外,不保证其行为与原始对象完全相同。我的实现没有涵盖所有可能性。
Python 3.5 很好地处理循环引用(参见 PEP 442 https://www.python.org/dev/peps/pep-0442/)
我会尽快将超时异步上下文管理器推送到 asyncio 中,请参阅草案:
`
import asyncio
import warnings
class Timeout:
def __init__(self, timeout, *, raise_error=False, loop=None):
self._timeout = timeout
if loop is None:
loop = asyncio.get_event_loop()
self._loop = loop
self._raise_error = raise_error
self._task = None
self._cancelled = False
self._cancel_handler = None
async def __aenter__(self):
self._task = asyncio.Task.current_task(loop=loop)
self._cancel_handler = self._loop.call_later(
self._cancel, self._timeout)
async def __aexit__(self, exc_type, exc_val, exc_tb):
if self._cancelled:
if self._raise_error:
raise asyncio.TimeoutError
else:
# suppress
return True
else:
self._cancel_handler.cancel()
# raise all other errors
def __del__(self):
if self._task:
# just for preventing improper usage
warnings.warn("Use async with")
def _cancel(self):
self._cancelled = self._task.cancel()
async def long_running_task():
while True:
asyncio.sleep(5)
async def run():
async with Timeout(1):
await long_running_task()
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
根据Huazuo Gao和germn的回答,我实现了ensure_weakly_binding_future
,与ensure_future
基本相同,但没有保持对绑定方法实例的强引用。它不会修改整体绑定(就像基于装饰器的解决方案那样)并在删除父实例时正确取消未来:
import asyncio
import weakref
def ensure_weakly_binding_future(method):
class Canceller:
def __call__(self, proxy):
self.future.cancel()
canceller = Canceller()
proxy_object = weakref.proxy(method.__self__, canceller)
weakly_bound_method = method.__func__.__get__(proxy_object)
future = asyncio.ensure_future(weakly_bound_method())
canceller.future = future
class ClassWithTimer:
def __init__(self):
ensure_weakly_binding_future(self.update)
def __del__(self):
print("deleted ClassWithTimer!", flush=True)
async def update(self):
while True:
await asyncio.sleep(1)
print("IN update of object " + repr(self), flush=True)
async def run():
foo = ClassWithTimer()
await asyncio.sleep(5.5)
del foo
await asyncio.sleep(2.5)
loop = asyncio.get_event_loop()
loop.run_until_complete(run())