如何编写可选择充当常规函数的 asyncio 协程?
How can I write asyncio coroutines that optionally act as regular functions?
我正在编写一个库,我希望最终用户能够有选择地使用它,就好像它的方法和函数不是协程一样。
例如,给出这个函数:
@asyncio.coroutine
def blah_getter():
return (yield from http_client.get('http://blahblahblah'))
不想在自己的代码中使用任何异步功能的最终用户仍然需要导入 asyncio 和 运行 这个:
>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())
如果我可以在 blah_getter
内部确定我是否被称为协程并做出相应的反应,那就太好了。
所以像这样:
@asyncio.coroutine
def blah_getter():
if magically_determine_if_being_yielded_from():
return (yield from http_client.get('http://blahblahblah'))
else:
el = asyncio.get_event_loop()
return el.run_until_complete(http_client.get('http://blahblahblah'))
您需要两个函数 -- 异步协程和同步正则函数:
@asyncio.coroutine
def async_gettter():
return (yield from http_client.get('http://example.com'))
def sync_getter()
return asyncio.get_event_loop().run_until_complete(async_getter())
magically_determine_if_being_yielded_from()
实际上是 event_loop.is_running()
但我强烈不建议在同一个函数中混合使用同步和异步代码。
我同意 Andrew 的回答,我只是想补充一点,如果您处理的是对象而不是顶级函数,则可以使用元类自动添加异步方法的同步版本。看这个例子:
import asyncio
import aiohttp
class SyncAdder(type):
""" A metaclass which adds synchronous version of coroutines.
This metaclass finds all coroutine functions defined on a class
and adds a synchronous version with a '_s' suffix appended to the
original function name.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
new_dct = {}
for name,val in dct.items():
# Make a sync version of all coroutine functions
if asyncio.iscoroutinefunction(val):
meth = cls.sync_maker(name)
syncname = '{}_s'.format(name)
meth.__name__ = syncname
meth.__qualname__ = '{}.{}'.format(clsname, syncname)
new_dct[syncname] = meth
dct.update(new_dct)
return super().__new__(cls, clsname, bases, dct)
@staticmethod
def sync_maker(func):
def sync_func(self, *args, **kwargs):
meth = getattr(self, func)
return asyncio.get_event_loop().run_until_complete(meth(*args, **kwargs))
return sync_func
class Stuff(metaclass=SyncAdder):
@asyncio.coroutine
def getter(self, url):
return (yield from aiohttp.request('GET', url))
用法:
>>> import aio, asyncio
>>> aio.Stuff.getter_s
<function Stuff.getter_s at 0x7f90459c2bf8>
>>> aio.Stuff.getter
<function Stuff.getter at 0x7f90459c2b70>
>>> s = aio.Stuff()
>>> s.getter_s('http://example.com')
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:13:21 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:13:21 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xyz.com:80', 'CONNECTION': 'keep-alive'}>
>>> asyncio.get_event_loop().run_until_complete(s.getter('http://example.com'))
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:25:09 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:25:09 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xys.com:80', 'CONNECTION': 'keep-alive'}>
您还可以创建一个简单的装饰器,使您的函数同步。这种方法可以应用于全局函数和方法。
一个例子。
# the decorator
def sync(f):
ASYNC_KEY = 'async'
def f_in(*args, **kwargs):
if ASYNC_KEY in kwargs:
async = kwargs.get(ASYNC_KEY)
del kwargs[ASYNC_KEY]
else:
async = True
if async:
return f(*args, **kwargs)
else:
return asyncio.get_event_loop().run_until_complete(f())
return f_in
# below: the usage
@sync
async def test():
print('In sleep...')
await asyncio.sleep(1)
print('After sleep')
# below: or asyncio.get_event_loop().create_task(test())
asyncio.get_event_loop().run_until_complete(test())
# and here is your syncronious version
test(async=False)
此外:创建特殊包装器 class 而不将 async
传递给每个方法调用可能是有意义的。示例如下。
class SyncCallerWrapper(object):
def __init__(self, obj, is_async=True):
self._obj = obj
self._is_async = is_async
def __getattr__(self, name):
def sync_wrapper(obj_attr):
def f(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(obj_attr(*args, **kwargs))
return f
obj_attr = getattr(self._obj, name)
if not self._is_async and asyncio.iscoroutinefunction(obj_attr):
return sync_wrapper(obj_attr)
return obj_attr
class C(object):
async def sleep1(self):
print('In sleep1...')
await asyncio.sleep(1)
print('After sleep1')
async def sleep2(self):
print('In sleep2...')
await asyncio.sleep(1)
print('After sleep2')
# you don't want any concurrency in your code
c_sync = SyncCallerWrapper(C(), is_async=False)
c_sync.sleep1()
c_sync.sleep2()
# here you want concurrency: class methods are coroutines
c_async = SyncCallerWrapper(C(), is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())
为了更优雅,您可以将 class 替换为函数(全局构造函数)。然后用户可以创建 class C
传递 is_async
参数并具有所需的行为:方法将作为常规 (is_async=False
) 或作为 async
函数 (is_async=True
).
def C(*args, **kwargs):
KEY_ISASYNC = 'is_async'
if KEY_ISASYNC in kwargs:
is_async = kwargs.get(KEY_ISASYNC)
del kwargs[KEY_ISASYNC]
else:
is_async = False
return SyncCallerWrapper(_C(*args, **kwargs), is_async=is_async)
# you don't want any concurrency in your code
c_sync = C(is_async=False)
c_sync.sleep1()
c_sync.sleep2()
# here you want concurrency: class methods are coroutines
c_async = C(is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())
我正在编写一个库,我希望最终用户能够有选择地使用它,就好像它的方法和函数不是协程一样。
例如,给出这个函数:
@asyncio.coroutine
def blah_getter():
return (yield from http_client.get('http://blahblahblah'))
不想在自己的代码中使用任何异步功能的最终用户仍然需要导入 asyncio 和 运行 这个:
>>> response = asyncio.get_event_loop().run_until_complete(blah_getter())
如果我可以在 blah_getter
内部确定我是否被称为协程并做出相应的反应,那就太好了。
所以像这样:
@asyncio.coroutine
def blah_getter():
if magically_determine_if_being_yielded_from():
return (yield from http_client.get('http://blahblahblah'))
else:
el = asyncio.get_event_loop()
return el.run_until_complete(http_client.get('http://blahblahblah'))
您需要两个函数 -- 异步协程和同步正则函数:
@asyncio.coroutine
def async_gettter():
return (yield from http_client.get('http://example.com'))
def sync_getter()
return asyncio.get_event_loop().run_until_complete(async_getter())
magically_determine_if_being_yielded_from()
实际上是 event_loop.is_running()
但我强烈不建议在同一个函数中混合使用同步和异步代码。
我同意 Andrew 的回答,我只是想补充一点,如果您处理的是对象而不是顶级函数,则可以使用元类自动添加异步方法的同步版本。看这个例子:
import asyncio
import aiohttp
class SyncAdder(type):
""" A metaclass which adds synchronous version of coroutines.
This metaclass finds all coroutine functions defined on a class
and adds a synchronous version with a '_s' suffix appended to the
original function name.
"""
def __new__(cls, clsname, bases, dct, **kwargs):
new_dct = {}
for name,val in dct.items():
# Make a sync version of all coroutine functions
if asyncio.iscoroutinefunction(val):
meth = cls.sync_maker(name)
syncname = '{}_s'.format(name)
meth.__name__ = syncname
meth.__qualname__ = '{}.{}'.format(clsname, syncname)
new_dct[syncname] = meth
dct.update(new_dct)
return super().__new__(cls, clsname, bases, dct)
@staticmethod
def sync_maker(func):
def sync_func(self, *args, **kwargs):
meth = getattr(self, func)
return asyncio.get_event_loop().run_until_complete(meth(*args, **kwargs))
return sync_func
class Stuff(metaclass=SyncAdder):
@asyncio.coroutine
def getter(self, url):
return (yield from aiohttp.request('GET', url))
用法:
>>> import aio, asyncio
>>> aio.Stuff.getter_s
<function Stuff.getter_s at 0x7f90459c2bf8>
>>> aio.Stuff.getter
<function Stuff.getter at 0x7f90459c2b70>
>>> s = aio.Stuff()
>>> s.getter_s('http://example.com')
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:13:21 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:13:21 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xyz.com:80', 'CONNECTION': 'keep-alive'}>
>>> asyncio.get_event_loop().run_until_complete(s.getter('http://example.com'))
<ClientResponse(http://example.com) [200 OK]>
<CIMultiDictProxy {'ACCEPT-RANGES': 'bytes', 'CACHE-CONTROL': 'max-age=604800', 'DATE': 'Mon, 11 May 2015 15:25:09 GMT', 'ETAG': '"359670651"', 'EXPIRES': 'Mon, 18 May 2015 15:25:09 GMT', 'SERVER': 'ECS (ewr/15BD)', 'X-CACHE': 'HIT', 'X-EC-CUSTOM-ERROR': '1', 'CONTENT-LENGTH': '1270', 'CONTENT-TYPE': 'text/html', 'LAST-MODIFIED': 'Fri, 09 Aug 2013 23:54:35 GMT', 'VIA': '1.1 xys.com:80', 'CONNECTION': 'keep-alive'}>
您还可以创建一个简单的装饰器,使您的函数同步。这种方法可以应用于全局函数和方法。
一个例子。
# the decorator
def sync(f):
ASYNC_KEY = 'async'
def f_in(*args, **kwargs):
if ASYNC_KEY in kwargs:
async = kwargs.get(ASYNC_KEY)
del kwargs[ASYNC_KEY]
else:
async = True
if async:
return f(*args, **kwargs)
else:
return asyncio.get_event_loop().run_until_complete(f())
return f_in
# below: the usage
@sync
async def test():
print('In sleep...')
await asyncio.sleep(1)
print('After sleep')
# below: or asyncio.get_event_loop().create_task(test())
asyncio.get_event_loop().run_until_complete(test())
# and here is your syncronious version
test(async=False)
此外:创建特殊包装器 class 而不将 async
传递给每个方法调用可能是有意义的。示例如下。
class SyncCallerWrapper(object):
def __init__(self, obj, is_async=True):
self._obj = obj
self._is_async = is_async
def __getattr__(self, name):
def sync_wrapper(obj_attr):
def f(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(obj_attr(*args, **kwargs))
return f
obj_attr = getattr(self._obj, name)
if not self._is_async and asyncio.iscoroutinefunction(obj_attr):
return sync_wrapper(obj_attr)
return obj_attr
class C(object):
async def sleep1(self):
print('In sleep1...')
await asyncio.sleep(1)
print('After sleep1')
async def sleep2(self):
print('In sleep2...')
await asyncio.sleep(1)
print('After sleep2')
# you don't want any concurrency in your code
c_sync = SyncCallerWrapper(C(), is_async=False)
c_sync.sleep1()
c_sync.sleep2()
# here you want concurrency: class methods are coroutines
c_async = SyncCallerWrapper(C(), is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())
为了更优雅,您可以将 class 替换为函数(全局构造函数)。然后用户可以创建 class C
传递 is_async
参数并具有所需的行为:方法将作为常规 (is_async=False
) 或作为 async
函数 (is_async=True
).
def C(*args, **kwargs):
KEY_ISASYNC = 'is_async'
if KEY_ISASYNC in kwargs:
is_async = kwargs.get(KEY_ISASYNC)
del kwargs[KEY_ISASYNC]
else:
is_async = False
return SyncCallerWrapper(_C(*args, **kwargs), is_async=is_async)
# you don't want any concurrency in your code
c_sync = C(is_async=False)
c_sync.sleep1()
c_sync.sleep2()
# here you want concurrency: class methods are coroutines
c_async = C(is_async=True)
asyncio.get_event_loop().run_until_complete(c_async.sleep1())
asyncio.get_event_loop().run_until_complete(c_async.sleep2())