python - 在 gRPC 无限流期间检测客户端丢弃

python - detect client drop during gRPC infinite stream

我正在实施无限流响应,例如使用 gRPC 架构的 pub/sub 模式。

有一个端点打开响应流并保持它直到客户端断开。为此,我存储了一个键值散列,其中键是 gRPC 上下文,值是我用来轮询发送消息的队列。

我的端点代码如下所示:

def StreamTrades(self, request, context):
    self.feeds[context] = queue.Queue()

    callback_queue = queue.Queue()

    def remove_feed():
        if self.feeds.get(context) is not None:
            del self.feeds[context]

    def stop_stream():
        remove_feed()

        def raise_stop_stream_exception():
            raise StopStream('stopping stream')

        callback_queue.put(raise_stop_stream_exception)

    context.add_callback(stop_stream)

    def output_generator():
        while True:
            try:
                try:
                    callback = callback_queue.get(False)
                    callback()
                except queue.Empty:
                    pass
                if self.feeds.get(context) is not None:
                    trade = self.feeds[context].get()
                    if isinstance(trade, trades_pb2.Trade):
                        yield trade
                else:
                    raise StopStream('stopping stream')
            except IndexError:
                pass
            except StopStream:
                return

    return output_generator()

此代码适用于订阅和发布对客户端的更改。但是有一个与退订有关的问题。检测客户端掉线的好方法是什么?使用 Context.add_callback(callBack) 似乎不起作用,因为回调仅在服务器完成并关闭流时调用。当客户端不再存在时,生成器不会引发任何类型的状态。我在 Java 中看到,当在 streamObserver 中调用 onNext 并且没有客户端时会抛出带有 Status.CANCELLED 的 StatusRuntimeException,它允许延迟取消订阅,这对我来说已经足够了。

有什么方法可以检测客户端在响应流期间断开连接吗?

您使用 ServicerContext.add_callback should be called when the client drops the connection; that it is not being called indicates that you're suffering from this bug 注册的回调。 不是 "the callback is only called when the server finishes and closes the stream".

的情况