python - 在 gRPC 无限流期间检测客户端丢弃
python - detect client drop during gRPC infinite stream
我正在实施无限流响应,例如使用 gRPC 架构的 pub/sub 模式。
有一个端点打开响应流并保持它直到客户端断开。为此,我存储了一个键值散列,其中键是 gRPC 上下文,值是我用来轮询发送消息的队列。
我的端点代码如下所示:
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
此代码适用于订阅和发布对客户端的更改。但是有一个与退订有关的问题。检测客户端掉线的好方法是什么?使用 Context.add_callback(callBack) 似乎不起作用,因为回调仅在服务器完成并关闭流时调用。当客户端不再存在时,生成器不会引发任何类型的状态。我在 Java 中看到,当在 streamObserver 中调用 onNext 并且没有客户端时会抛出带有 Status.CANCELLED 的 StatusRuntimeException,它允许延迟取消订阅,这对我来说已经足够了。
有什么方法可以检测客户端在响应流期间断开连接吗?
您使用 ServicerContext.add_callback
should be called when the client drops the connection; that it is not being called indicates that you're suffering from this bug 注册的回调。 不是 "the callback is only called when the server finishes and closes the stream".
的情况
我正在实施无限流响应,例如使用 gRPC 架构的 pub/sub 模式。
有一个端点打开响应流并保持它直到客户端断开。为此,我存储了一个键值散列,其中键是 gRPC 上下文,值是我用来轮询发送消息的队列。
我的端点代码如下所示:
def StreamTrades(self, request, context):
self.feeds[context] = queue.Queue()
callback_queue = queue.Queue()
def remove_feed():
if self.feeds.get(context) is not None:
del self.feeds[context]
def stop_stream():
remove_feed()
def raise_stop_stream_exception():
raise StopStream('stopping stream')
callback_queue.put(raise_stop_stream_exception)
context.add_callback(stop_stream)
def output_generator():
while True:
try:
try:
callback = callback_queue.get(False)
callback()
except queue.Empty:
pass
if self.feeds.get(context) is not None:
trade = self.feeds[context].get()
if isinstance(trade, trades_pb2.Trade):
yield trade
else:
raise StopStream('stopping stream')
except IndexError:
pass
except StopStream:
return
return output_generator()
此代码适用于订阅和发布对客户端的更改。但是有一个与退订有关的问题。检测客户端掉线的好方法是什么?使用 Context.add_callback(callBack) 似乎不起作用,因为回调仅在服务器完成并关闭流时调用。当客户端不再存在时,生成器不会引发任何类型的状态。我在 Java 中看到,当在 streamObserver 中调用 onNext 并且没有客户端时会抛出带有 Status.CANCELLED 的 StatusRuntimeException,它允许延迟取消订阅,这对我来说已经足够了。
有什么方法可以检测客户端在响应流期间断开连接吗?
您使用 ServicerContext.add_callback
should be called when the client drops the connection; that it is not being called indicates that you're suffering from this bug 注册的回调。 不是 "the callback is only called when the server finishes and closes the stream".