Tornado 中具有多个进程的异步连接池

Asynchronous Pool of Connections in Tornado with multiple processes

我正在使用 Tornado 4.2.1tornadoes 2.4.1 库来查询我的 Elasticsearch 数据库,我正在寻找用于初始化连接池以在多进程服务中的多个 RequestHandler 实例之间共享的方法。

可以吗?是否有特定的 Tornado 库可以做到这一点?

提前致谢

由于 tornado-es 只是一个 HTTP 客户端,它使用 AsyncHTTPClient in the ESConnection每个请求都会建立新的 TCP 连接,除非指定 Connection: keep-alive header。

conn = ESConnection()
conn.httprequest_kwargs['headers'] = {'Connection': 'keep-alive'}

我没有测试过,但应该可以。我在 ruby(使用 patron http 客户端)中使用了类似的设置,并且效果很好

接下来

AsyncHTTPClient 对每个 ioloop 的最大同时请求数 (fetch) 有限制。每个达到限制的请求都只是在内部排队。

您可能想要增加全局限制:

AsyncHTTPClient.configure(None, max_clients=50)

或用自己的限制(force_instance)分隔客户端:

from tornadoes import ESConnection
from tornado.httpclient import AsyncHTTPClient

class CustomESConnection(ESConnection):

    def __init__(self, , host='localhost', port='9200', io_loop=None, protocol='http', max_clients=20):
        super(CustomESConnection, self).__init__(host, port, io_loop, protocol)
        self.client = AsyncHTTPClient(force_instance=True, max_clients=max_clients)

最后

要重用相同的 ESConnection,您可以在应用程序中创建它,因为该应用程序可用于每个请求 (RequestHandler)

from tornado.web import Application, RequestHandler
from tornadoes import ESConnection

class MainHandler(RequestHandler):
    def get(self):
        yield self.application.es.search('something')


class MyApp(Application):

    def __init__(self, *args, **kwargs):
        super(MyApp, self).__init__(*args, **kwargs)

        self.es = ESconnection()

if __name__ == "__main__":
    application = MyApp([
        (r"/", MainHandler),
    ])
    application.listen(8888)
    tornado.ioloop.IOLoop.current().start()

多进程

其实没有简单的方法。常见的方法是 pooler,它主要在需要持久连接时使用,例如数据库(pgbouncer for postgres)或作为对 high-load 服务的优化。

并且您将不得不编写一个 pooler,一个到 es

的网关应用程序
subprocess1 
           \  (http, zmq, ...)
            \            
              > pooler (some queue and tornadoes api) - http -> elastisearch
            /
           /
subprocess2

子进程可以通过 HTTP,ØMQ (there are many examples even pooler) or some implementation of IPC(sockects,...)与 pooler 通信。