龙卷风处理程序有什么方法可以检测另一端的关闭?

Any way for tornado handler to detect closure on other end?

我有一个龙卷风协程处理程序,部分看起来像:

class QueryHandler(tornado.web.RequestHandler):
    queryQueues = defaultdict(tornado.queues.Queue)

    @tornado.gen.coroutine
    def get(self, network):
        qq = self.queryQueues[network]
        query = yield qq.get()
        # do some work with with the dequeued query
        self.write(response)

在客户端,我使用python-requests对其进行长轮询:

fetched = session.get(QueryURL)

我可以进行查询,服务器阻塞等待队列,直到吐出一些东西来处理并最终响应。

这工作非常顺利,直到...长轮询在处理程序阻塞队列时关闭并重新启动。当我在客户端停止查询时,处理程序会愉快地保持阻塞状态。更糟糕的是,如果我在客户端重新启动查询,我现在有第二个处理程序实例阻塞在队列中。所以当队列确实有数据出现时,陈旧的处理程序处理它并回复 bitbucket,重新启动的查询现在被无限期地阻止。

有什么模式可以避免这种情况吗?我曾希望当客户端关闭时,处理程序会收到某种异常,表明事情已经发生了。 queue.get() 可以有超时,但我真正想要的不是超时,而是一种 "unless I close" 异常。

你想要一个 "queue with guaranteed delivery" 这在分布式系统中是一个难题。毕竟就算"self.write"成功了,也不能确定对方真的收到了消息

基本方法如下所示:

  • 队列中的每个条目都获得一个大于所有先前 ID 的 ID
  • 当客户端连接时,它要求订阅队列
  • 当客户端断开连接时,它会重新连接并请求所有 ID 大于它看到的最后一个 ID 的条目
  • 当您的 QueryHandler 收到一个 get 和非 None id 时,它首先为所有 id 大于 id 的条目提供服务,然后开始在队列中等待
  • 当您的 QueryHandler 从 self.write 引发异常时,忽略它:客户端负责检索丢失的条目
  • 将所有过去的条目保存在列表中
  • 一段时间后(几小时?)过期最旧的列表条目