Tornado与aio_etcd建立锁
Tornado and aio_etcd to establish a lock
我正在尝试让 Tornado 与 aio_etcd
aio_etcd documentation here
一起工作
我想在 Tornado 协程中获取一个锁。我写了下面的代码。文档中的示例使用 'await' 但是我用 yield from 代替了它,因为我使用 @tornado.gen.coroutine
装饰 我不确定这是否正确。我使用以下代码遇到以下崩溃:
raise RuntimeError('Timeout context manager should be used '
RuntimeError: Timeout context manager should be used inside a task
ERROR:tornado.access:500 GET / (::1) 6.33ms
...
import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
from multiprocessing import Process
import aio_etcd as etcd
def run_process(port):
app=Application()
server=tornado.httpserver.HTTPServer(app)
server.listen(port)
tornado.ioloop.IOLoop.current().start()
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
print("in function")
l = etcd.Lock(self.application.etcdClient, "L")
# Use the lock object:
yield from l.acquire(blocking=True, lock_ttl=None)
print("got lock")
yield tornado.gen.sleep(30)
yield from l.release()
print("releasing lock")
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/",MainHandler),
]
self.etcdClient=etcd.Client()
# Settings dict for Application
settings = {
}
tornado.web.Application.__init__(self,handlers,debug=True,**settings)
if __name__ =='__main__':
Process(target=run_process,args=(8000,)).start()
Process(target=run_process,args=(8001,)).start()
同样在我的代码中,我不使用新的异步键盘,而是我所有的 Tornado 协同例程都装饰有 @tornado.gen.coroutine
有人能解释一下为什么使用 yield 关键字而不是 yield from in Tornado 协程以及它们为何不同。有谁知道我如何让这段代码在 Tornado 中工作?
使用 asyncio 更新代码:
import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
import asyncio
from multiprocessing import Process
import aio_etcd as etcd
import tornado.platform.asyncio
def run_process(port):
tornado.platform.asyncio.AsyncIOMainLoop().install()
app=Application()
server=tornado.httpserver.HTTPServer(app)
server.listen(port)
asyncio.get_event_loop().run_forever()
class MainHandler(tornado.web.RequestHandler):
async def get(self):
print("here")
lock = etcd.Lock(self.application.etcdClient, "hello")
# Use the lock object:
await asyncio.ensure_future(lock.acquire())
state = await asyncio.ensure_future(lock.is_locked())
print("lock state")
print(state)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/",MainHandler),
]
self.etcdClient=etcd.Client()
# Settings dict for Application
settings = {
}
tornado.web.Application.__init__(self,handlers,debug=True,**settings)
if __name__ =='__main__':
Process(target=run_process,args=(8000,)).start()
Process(target=run_process,args=(8001,)).start()
崩溃痕迹:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/web.py", line 1469, in _execute
result = yield result
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "<string>", line 6, in _wrap_awaitable
File "check3.py", line 27, in get
await asyncio.ensure_future(lock.acquire())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/lock.py", line 67, in acquire
res = await self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 449, in write
response = await self.api_execute(path, method, params=params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 780, in wrapper
response = await payload(self, path, method, params=params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 553, in __await__
resp = yield from self._coro
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 198, in _request
proxy=proxy, proxy_auth=proxy_auth, timeout=timeout)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client_reqrep.py", line 79, in __init__
url2 = url.with_query(params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in with_query
for k, v in query.items())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in <genexpr>
for k, v in query.items())
File "yarl/_quoting.pyx", line 46, in yarl._quoting._quote (yarl/_quoting.c:1384)
TypeError: Argument should be str
许多为 asyncio 编写的库都可以与 Tornado 互操作。不幸的是,有些库,比如 aio_etcd,不是,因为它们以一种具体依赖于 asyncio 的方式实现超时。查看 aiohttp issue 877 - this issue describes a problem in aiohttp.Timeout. That code was spun off from aiohttp to the separate async_timeout 包,这个单独的包仍然与 Tornado 不兼容。这就是 aio_etcd 用于实现超时的方法,因此 aio_etcd 也与 Tornado 不兼容。
长期修复是在 aio_etcd's tracker asking for Tornado compatibility. aio_etcd could provide Tornado compatibility by not using async_timeout if timeout=None
. It's the same suggestion as in this comment 中打开一个错误。打开工单时请link回答这个 Whosebug 问题。
短期修复是使用 asyncio 及其网络框架 aiohttp,而不是 Tornado:
from multiprocessing import Process
import asyncio
from aiohttp import web
import aio_etcd as etcd
# Don't create this in the parent process, wait for Process() to spawn child.
etcd_client = None
async def handle(request):
print("in function")
l = etcd.Lock(etcd_client, "L")
# Use the lock object:
await l.acquire(blocking=True, lock_ttl=None)
print("got lock")
await asyncio.sleep(30)
await l.release()
print("releasing lock")
return web.Response(text='ok')
def run_process(port):
global etcd_client
app = web.Application(debug=True)
app.router.add_get('/', handle)
# Create AFTER multiprocess starts this child process.
etcd_client = etcd.Client()
web.run_app(app, port=port)
if __name__ == '__main__':
# run_process(8000)
Process(target=run_process, args=(8000,)).start()
Process(target=run_process, args=(8001,)).start()
aiohttp docs to get you started are here.
回答您的其他问题:Tornado 在 gen.coroutine
中使用 "yield" 多年。它的设计影响了 asyncio 的协程,但他们在 Python 3.4 中使用了 "yield from"。 Guido's explanation of the difference between "yield" and "yield from" is here, and it's certainly interesting, but less relevant now. In Python 3.5 "yield from" was superseded by PEP 492 -- Coroutines with async and await syntax.
如果您的代码永远不需要 运行 在 Python 早于 3.5,只需使用 "async" 和 "await" 而不是 "coroutine" 装饰器和 "yield"(在 Tornado 中)或 "yield from"(在 asyncio 中)。对你来说,我知道你正在 运行ning Python 3.5 因为 aio_etcd 需要它,所以就像我在上面的代码中演示的那样使用 async 和 await。
正如 Jesse 所说,一些 asyncio
库需要 asyncio
并且目前不能与 Tornado 一起使用(Tornado 的未来版本将提高这种兼容性)。但是,有一种解决方法不需要离开 Tornado。在屈服或等待之前,只需将对 aio_etcd
的任何调用用 asyncio.ensure_future()
包装起来。这是 aiohttp
的示例(因为我没有用于测试的 etcd
服务器):
from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
from tornado.ioloop import IOLoop
import aiohttp
import asyncio
async def main():
print(await asyncio.ensure_future(aiohttp.get('https://www.google.com')))
IOLoop.current().run_sync(main)
我正在尝试让 Tornado 与 aio_etcd
aio_etcd documentation here
我想在 Tornado 协程中获取一个锁。我写了下面的代码。文档中的示例使用 'await' 但是我用 yield from 代替了它,因为我使用 @tornado.gen.coroutine
装饰 我不确定这是否正确。我使用以下代码遇到以下崩溃:
raise RuntimeError('Timeout context manager should be used '
RuntimeError: Timeout context manager should be used inside a task
ERROR:tornado.access:500 GET / (::1) 6.33ms
...
import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
from multiprocessing import Process
import aio_etcd as etcd
def run_process(port):
app=Application()
server=tornado.httpserver.HTTPServer(app)
server.listen(port)
tornado.ioloop.IOLoop.current().start()
class MainHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
print("in function")
l = etcd.Lock(self.application.etcdClient, "L")
# Use the lock object:
yield from l.acquire(blocking=True, lock_ttl=None)
print("got lock")
yield tornado.gen.sleep(30)
yield from l.release()
print("releasing lock")
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/",MainHandler),
]
self.etcdClient=etcd.Client()
# Settings dict for Application
settings = {
}
tornado.web.Application.__init__(self,handlers,debug=True,**settings)
if __name__ =='__main__':
Process(target=run_process,args=(8000,)).start()
Process(target=run_process,args=(8001,)).start()
同样在我的代码中,我不使用新的异步键盘,而是我所有的 Tornado 协同例程都装饰有 @tornado.gen.coroutine
有人能解释一下为什么使用 yield 关键字而不是 yield from in Tornado 协程以及它们为何不同。有谁知道我如何让这段代码在 Tornado 中工作?
使用 asyncio 更新代码:
import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
import asyncio
from multiprocessing import Process
import aio_etcd as etcd
import tornado.platform.asyncio
def run_process(port):
tornado.platform.asyncio.AsyncIOMainLoop().install()
app=Application()
server=tornado.httpserver.HTTPServer(app)
server.listen(port)
asyncio.get_event_loop().run_forever()
class MainHandler(tornado.web.RequestHandler):
async def get(self):
print("here")
lock = etcd.Lock(self.application.etcdClient, "hello")
# Use the lock object:
await asyncio.ensure_future(lock.acquire())
state = await asyncio.ensure_future(lock.is_locked())
print("lock state")
print(state)
class Application(tornado.web.Application):
def __init__(self):
handlers = [
(r"/",MainHandler),
]
self.etcdClient=etcd.Client()
# Settings dict for Application
settings = {
}
tornado.web.Application.__init__(self,handlers,debug=True,**settings)
if __name__ =='__main__':
Process(target=run_process,args=(8000,)).start()
Process(target=run_process,args=(8001,)).start()
崩溃痕迹:
Traceback (most recent call last):
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/web.py", line 1469, in _execute
result = yield result
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
yielded = self.gen.throw(*exc_info)
File "<string>", line 6, in _wrap_awaitable
File "check3.py", line 27, in get
await asyncio.ensure_future(lock.acquire())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 361, in __iter__
yield self # This tells Task to wait for completion.
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
value = future.result()
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
raise_exc_info(self._exc_info)
File "<string>", line 3, in raise_exc_info
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
result = coro.send(None)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/lock.py", line 67, in acquire
res = await self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 449, in write
response = await self.api_execute(path, method, params=params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 780, in wrapper
response = await payload(self, path, method, params=params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 553, in __await__
resp = yield from self._coro
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 198, in _request
proxy=proxy, proxy_auth=proxy_auth, timeout=timeout)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client_reqrep.py", line 79, in __init__
url2 = url.with_query(params)
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in with_query
for k, v in query.items())
File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in <genexpr>
for k, v in query.items())
File "yarl/_quoting.pyx", line 46, in yarl._quoting._quote (yarl/_quoting.c:1384)
TypeError: Argument should be str
许多为 asyncio 编写的库都可以与 Tornado 互操作。不幸的是,有些库,比如 aio_etcd,不是,因为它们以一种具体依赖于 asyncio 的方式实现超时。查看 aiohttp issue 877 - this issue describes a problem in aiohttp.Timeout. That code was spun off from aiohttp to the separate async_timeout 包,这个单独的包仍然与 Tornado 不兼容。这就是 aio_etcd 用于实现超时的方法,因此 aio_etcd 也与 Tornado 不兼容。
长期修复是在 aio_etcd's tracker asking for Tornado compatibility. aio_etcd could provide Tornado compatibility by not using async_timeout if timeout=None
. It's the same suggestion as in this comment 中打开一个错误。打开工单时请link回答这个 Whosebug 问题。
短期修复是使用 asyncio 及其网络框架 aiohttp,而不是 Tornado:
from multiprocessing import Process
import asyncio
from aiohttp import web
import aio_etcd as etcd
# Don't create this in the parent process, wait for Process() to spawn child.
etcd_client = None
async def handle(request):
print("in function")
l = etcd.Lock(etcd_client, "L")
# Use the lock object:
await l.acquire(blocking=True, lock_ttl=None)
print("got lock")
await asyncio.sleep(30)
await l.release()
print("releasing lock")
return web.Response(text='ok')
def run_process(port):
global etcd_client
app = web.Application(debug=True)
app.router.add_get('/', handle)
# Create AFTER multiprocess starts this child process.
etcd_client = etcd.Client()
web.run_app(app, port=port)
if __name__ == '__main__':
# run_process(8000)
Process(target=run_process, args=(8000,)).start()
Process(target=run_process, args=(8001,)).start()
aiohttp docs to get you started are here.
回答您的其他问题:Tornado 在 gen.coroutine
中使用 "yield" 多年。它的设计影响了 asyncio 的协程,但他们在 Python 3.4 中使用了 "yield from"。 Guido's explanation of the difference between "yield" and "yield from" is here, and it's certainly interesting, but less relevant now. In Python 3.5 "yield from" was superseded by PEP 492 -- Coroutines with async and await syntax.
如果您的代码永远不需要 运行 在 Python 早于 3.5,只需使用 "async" 和 "await" 而不是 "coroutine" 装饰器和 "yield"(在 Tornado 中)或 "yield from"(在 asyncio 中)。对你来说,我知道你正在 运行ning Python 3.5 因为 aio_etcd 需要它,所以就像我在上面的代码中演示的那样使用 async 和 await。
正如 Jesse 所说,一些 asyncio
库需要 asyncio
并且目前不能与 Tornado 一起使用(Tornado 的未来版本将提高这种兼容性)。但是,有一种解决方法不需要离开 Tornado。在屈服或等待之前,只需将对 aio_etcd
的任何调用用 asyncio.ensure_future()
包装起来。这是 aiohttp
的示例(因为我没有用于测试的 etcd
服务器):
from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()
from tornado.ioloop import IOLoop
import aiohttp
import asyncio
async def main():
print(await asyncio.ensure_future(aiohttp.get('https://www.google.com')))
IOLoop.current().run_sync(main)