Tornado与aio_etcd建立锁

Tornado and aio_etcd to establish a lock

我正在尝试让 Tornado 与 aio_etcd aio_etcd documentation here

一起工作

我想在 Tornado 协程中获取一个锁。我写了下面的代码。文档中的示例使用 'await' 但是我用 yield from 代替了它,因为我使用 @tornado.gen.coroutine 装饰 我不确定这是否正确。我使用以下代码遇到以下崩溃:

    raise RuntimeError('Timeout context manager should be used '
RuntimeError: Timeout context manager should be used inside a task
ERROR:tornado.access:500 GET / (::1) 6.33ms

...

import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
from multiprocessing import Process
import aio_etcd as etcd

def run_process(port):

   app=Application()
   server=tornado.httpserver.HTTPServer(app)
   server.listen(port)
   tornado.ioloop.IOLoop.current().start()

class  MainHandler(tornado.web.RequestHandler):
   @tornado.gen.coroutine
   def get(self):
      print("in function")

      l = etcd.Lock(self.application.etcdClient, "L")

      # Use the lock object:
      yield from l.acquire(blocking=True, lock_ttl=None)
      print("got lock")
      yield tornado.gen.sleep(30)
      yield from l.release()
      print("releasing lock")



class Application(tornado.web.Application):
   def __init__(self):
      handlers = [
         (r"/",MainHandler),
      ]

      self.etcdClient=etcd.Client()

      # Settings dict for Application
      settings = {
      }
      tornado.web.Application.__init__(self,handlers,debug=True,**settings)      

if __name__ =='__main__':
   Process(target=run_process,args=(8000,)).start()
   Process(target=run_process,args=(8001,)).start()

同样在我的代码中,我不使用新的异步键盘,而是我所有的 Tornado 协同例程都装饰有 @tornado.gen.coroutine 有人能解释一下为什么使用 yield 关键字而不是 yield from in Tornado 协程以及它们为何不同。有谁知道我如何让这段代码在 Tornado 中工作?

使用 asyncio 更新代码:

import tornado.web
import tornado.httpserver
import tornado.httpclient
import tornado.ioloop
import tornado.options
import tornado.gen
import tornado.auth
import asyncio 
from multiprocessing import Process
import aio_etcd as etcd
import tornado.platform.asyncio 
def run_process(port):
   tornado.platform.asyncio.AsyncIOMainLoop().install()
   app=Application()
   server=tornado.httpserver.HTTPServer(app)
   server.listen(port)
   asyncio.get_event_loop().run_forever()

class  MainHandler(tornado.web.RequestHandler):
   async def get(self):

      print("here")
      lock = etcd.Lock(self.application.etcdClient, "hello")
      # Use the lock object:
      await asyncio.ensure_future(lock.acquire())
      state = await asyncio.ensure_future(lock.is_locked())
      print("lock state")
      print(state)

class Application(tornado.web.Application):
   def __init__(self):
      handlers = [
         (r"/",MainHandler),
      ]

      self.etcdClient=etcd.Client()

      # Settings dict for Application
      settings = {
      }
      tornado.web.Application.__init__(self,handlers,debug=True,**settings)      
if __name__ =='__main__':
   Process(target=run_process,args=(8000,)).start()
   Process(target=run_process,args=(8001,)).start()

崩溃痕迹:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/web.py", line 1469, in _execute
    result = yield result
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1021, in run
    yielded = self.gen.throw(*exc_info)
  File "<string>", line 6, in _wrap_awaitable
  File "check3.py", line 27, in get
    await asyncio.ensure_future(lock.acquire())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/futures.py", line 361, in __iter__
    yield self  # This tells Task to wait for completion.
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/gen.py", line 1015, in run
    value = future.result()
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/tornado/concurrent.py", line 237, in result
    raise_exc_info(self._exc_info)
  File "<string>", line 3, in raise_exc_info
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/asyncio/tasks.py", line 239, in _step
    result = coro.send(None)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/lock.py", line 67, in acquire
    res = await self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 449, in write
    response = await self.api_execute(path, method, params=params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aio_etcd-0.4.3.1-py3.5.egg/aio_etcd/client.py", line 780, in wrapper
    response = await payload(self, path, method, params=params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 553, in __await__
    resp = yield from self._coro
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client.py", line 198, in _request
    proxy=proxy, proxy_auth=proxy_auth, timeout=timeout)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/aiohttp-1.1.1-py3.5-macosx-10.6-intel.egg/aiohttp/client_reqrep.py", line 79, in __init__
    url2 = url.with_query(params)
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in with_query
    for k, v in query.items())
  File "/Library/Frameworks/Python.framework/Versions/3.5/lib/python3.5/site-packages/yarl-0.5.3-py3.5-macosx-10.6-intel.egg/yarl/__init__.py", line 607, in <genexpr>
    for k, v in query.items())
  File "yarl/_quoting.pyx", line 46, in yarl._quoting._quote (yarl/_quoting.c:1384)
TypeError: Argument should be str

许多为 asyncio 编写的库都可以与 Tornado 互操作。不幸的是,有些库,比如 aio_etcd,不是,因为它们以一种具体依赖于 asyncio 的方式实现超时。查看 aiohttp issue 877 - this issue describes a problem in aiohttp.Timeout. That code was spun off from aiohttp to the separate async_timeout 包,这个单独的包仍然与 Tornado 不兼容。这就是 aio_etcd 用于实现超时的方法,因此 aio_etcd 也与 Tornado 不兼容。

长期修复是在 aio_etcd's tracker asking for Tornado compatibility. aio_etcd could provide Tornado compatibility by not using async_timeout if timeout=None. It's the same suggestion as in this comment 中打开一个错误。打开工单时请link回答这个 Whosebug 问题。

短期修复是使用 asyncio 及其网络框架 aiohttp,而不是 Tornado:

from multiprocessing import Process

import asyncio
from aiohttp import web
import aio_etcd as etcd

# Don't create this in the parent process, wait for Process() to spawn child.
etcd_client = None

async def handle(request):
    print("in function")

    l = etcd.Lock(etcd_client, "L")

    # Use the lock object:
    await l.acquire(blocking=True, lock_ttl=None)
    print("got lock")
    await asyncio.sleep(30)
    await l.release()
    print("releasing lock")
    return web.Response(text='ok')


def run_process(port):
    global etcd_client
    app = web.Application(debug=True)
    app.router.add_get('/', handle)
    # Create AFTER multiprocess starts this child process.
    etcd_client = etcd.Client()
    web.run_app(app, port=port)


if __name__ == '__main__':
    # run_process(8000)
    Process(target=run_process, args=(8000,)).start()
    Process(target=run_process, args=(8001,)).start()

aiohttp docs to get you started are here.

回答您的其他问题:Tornado 在 gen.coroutine 中使用 "yield" 多年。它的设计影响了 asyncio 的协程,但他们在 Python 3.4 中使用了 "yield from"。 Guido's explanation of the difference between "yield" and "yield from" is here, and it's certainly interesting, but less relevant now. In Python 3.5 "yield from" was superseded by PEP 492 -- Coroutines with async and await syntax.

如果您的代码永远不需要 运行 在 Python 早于 3.5,只需使用 "async" 和 "await" 而不是 "coroutine" 装饰器和 "yield"(在 Tornado 中)或 "yield from"(在 asyncio 中)。对你来说,我知道你正在 运行ning Python 3.5 因为 aio_etcd 需要它,所以就像我在上面的代码中演示的那样使用 async 和 await。

正如 Jesse 所说,一些 asyncio 库需要 asyncio 并且目前不能与 Tornado 一起使用(Tornado 的未来版本将提高这种兼容性)。但是,有一种解决方法不需要离开 Tornado。在屈服或等待之前,只需将对 aio_etcd 的任何调用用 asyncio.ensure_future() 包装起来。这是 aiohttp 的示例(因为我没有用于测试的 etcd 服务器):

from tornado.platform.asyncio import AsyncIOMainLoop
AsyncIOMainLoop().install()

from tornado.ioloop import IOLoop
import aiohttp
import asyncio

async def main():
    print(await asyncio.ensure_future(aiohttp.get('https://www.google.com')))

IOLoop.current().run_sync(main)