gRPC:会合终止于(StatusCode.INTERNAL,接收到 RST_STREAM,错误代码为 2)
gRPC: Rendezvous terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)
我正在 python 中实现 gRPC 客户端和服务器。服务器成功从客户端接收数据,但客户端收到返回“RST_STREAM,错误代码为 2”。
它到底是什么意思,我该如何解决?
这是我的原型文件:
service MyApi {
rpc SelectModelForDataset (Dataset) returns (SelectedModel) {
}
}
message Dataset {
// ...
}
message SelectedModel {
// ...
}
我的服务实现如下所示:
class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
def SelectModelForDataset(self, request, context):
print("Processing started.")
selectedModel = ModelSelectionModule.run(request, context)
print("Processing Completed.")
return selectedModel
我用这个代码启动服务器:
import grpc
from concurrent import futures
#...
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
我的客户是这样的:
channel = grpc.insecure_channel(target='localhost:50051')
stub = my_api_pb2_grpc.MyApiStub(channel)
dataset = my_api_pb2.Dataset()
# fill the object ...
model = stub.SelectModelForDataset(dataset) # call server
客户端发出调用后,服务器开始处理直到完成(大约需要一分钟),但是客户端returns立即 出现以下错误:
Traceback (most recent call last):
File "Client.py", line 32, in <module>
run()
File "Client.py", line 26, in run
model = stub.SelectModelForDataset(dataset) # call server
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 484, in __call__
return _end_unary_response_blocking(state, call, False, deadline)
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 434, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>
如果我异步执行请求并等待未来,
model_future = stub.SelectModelForDataset.future(dataset) # call server
model = model_future.result()
客户端一直等到完成,但在那之后仍然returns一个错误:
Traceback (most recent call last):
File "AsyncClient.py", line 35, in <module>
run()
File "AsyncClient.py", line 29, in run
model = model_future.result()
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 276, in result
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>
UPD:启用跟踪后 GRPC_TRACE=all
我发现了以下内容:
客户端,请求后立即:
E0109 17:59:42.248727600 1981 channel_connectivity.cc:126] watch_completion_error: {"created":"@1515520782.248638500","description":"GOAWAY received","file":"src/core/ext/transport/chttp2/transport/chttp2_transport.cc","file_line":1137,"http2_error":0,"raw_bytes":"Server shutdown"}
E0109 17:59:42.451048100 1979 channel_connectivity.cc:126] watch_completion_error: "Cancelled"
E0109 17:59:42.451160000 1979 completion_queue.cc:659] Operation failed: tag=0x7f6e5cd1caf8, error={"created":"@1515520782.451034300","description":"Timed out waiting for connection state change","file":"src/core/ext/filters/client_channel/channel_connectivity.cc","file_line":133}
...(last two messages keep repeating 5 times every second)
服务器:
E0109 17:59:42.248201000 1985 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520782.248170000","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
E0109 17:59:42.248541100 1975 tcp_server_posix.cc:231] Failed accept4: Invalid argument
E0109 17:59:47.362868700 1994 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520787.362853500","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
E0109 17:59:52.430612500 2000 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520792.430598800","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
... (last message kept repeating every few seconds)
UPD2:
我的 Server.py
文件的全部内容:
import ModelSelectionModule
import my_api_pb2_grpc
import my_api_pb2
import grpc
from concurrent import futures
import time
class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
def SelectModelForDataset(self, request, context):
print("Processing started.")
selectedModel = ModelSelectionModule.run(request, context)
print("Processing Completed.")
return selectedModel
# TODO(shalamov): what is the best way to run a python server?
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started\n")
try:
while True:
time.sleep(24 * 60 * 60) # run for 24h
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
UPD3:
似乎 ModelSelectionModule.run
导致了问题。我试图将它隔离到一个单独的线程中,但没有帮助。 selectedModel
最终被计算出来,但是那个时候客户端已经走了。 如何防止此调用干扰 grpc?
pool = ThreadPool(processes=1)
async_result = pool.apply_async(ModelSelectionModule.run(request, context))
selectedModel = async_result.get()
调用相当复杂,它产生并加入许多线程,调用不同的库,如 scikit-learn
和 smac
等。如果我post全部放在这里就太多了。
调试时,我发现在客户端请求后,服务器保持2个连接打开(fd 3
和fd 8
)。如果我手动关闭 fd 8
或向其写入一些字节,我在客户端中看到的错误将变为 Stream removed
(而不是 Received RST_STREAM with error code 2
)。似乎套接字 (fd 8
) 以某种方式被子进程破坏了。 怎么可能?如何保护套接字不被子进程访问?
这是在进程处理程序中使用 fork() 的结果。 gRPC Python 不支持此用例。
我遇到了这个问题,刚刚解决了,你用的是with_call()方法吗?
错误代码:
response = stub.SayHello.with_call(request=request, metadata=metadata)
响应是一个元组。
成功代码:不要使用with_call()
response = stub.SayHello(request=request, metadata=metadata)
response 是一个响应对象。
我正在 python 中实现 gRPC 客户端和服务器。服务器成功从客户端接收数据,但客户端收到返回“RST_STREAM,错误代码为 2”。
它到底是什么意思,我该如何解决?
这是我的原型文件:
service MyApi {
rpc SelectModelForDataset (Dataset) returns (SelectedModel) {
}
}
message Dataset {
// ...
}
message SelectedModel {
// ...
}
我的服务实现如下所示:
class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
def SelectModelForDataset(self, request, context):
print("Processing started.")
selectedModel = ModelSelectionModule.run(request, context)
print("Processing Completed.")
return selectedModel
我用这个代码启动服务器:
import grpc
from concurrent import futures
#...
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
我的客户是这样的:
channel = grpc.insecure_channel(target='localhost:50051')
stub = my_api_pb2_grpc.MyApiStub(channel)
dataset = my_api_pb2.Dataset()
# fill the object ...
model = stub.SelectModelForDataset(dataset) # call server
客户端发出调用后,服务器开始处理直到完成(大约需要一分钟),但是客户端returns立即 出现以下错误:
Traceback (most recent call last):
File "Client.py", line 32, in <module>
run()
File "Client.py", line 26, in run
model = stub.SelectModelForDataset(dataset) # call server
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 484, in __call__
return _end_unary_response_blocking(state, call, False, deadline)
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 434, in _end_unary_response_blocking
raise _Rendezvous(state, None, None, deadline)
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>
如果我异步执行请求并等待未来,
model_future = stub.SelectModelForDataset.future(dataset) # call server
model = model_future.result()
客户端一直等到完成,但在那之后仍然returns一个错误:
Traceback (most recent call last):
File "AsyncClient.py", line 35, in <module>
run()
File "AsyncClient.py", line 29, in run
model = model_future.result()
File "/usr/local/lib/python3.5/dist-packages/grpc/_channel.py", line 276, in result
raise self
grpc._channel._Rendezvous: <_Rendezvous of RPC that terminated with (StatusCode.INTERNAL, Received RST_STREAM with error code 2)>
UPD:启用跟踪后 GRPC_TRACE=all
我发现了以下内容:
客户端,请求后立即:
E0109 17:59:42.248727600 1981 channel_connectivity.cc:126] watch_completion_error: {"created":"@1515520782.248638500","description":"GOAWAY received","file":"src/core/ext/transport/chttp2/transport/chttp2_transport.cc","file_line":1137,"http2_error":0,"raw_bytes":"Server shutdown"}
E0109 17:59:42.451048100 1979 channel_connectivity.cc:126] watch_completion_error: "Cancelled"
E0109 17:59:42.451160000 1979 completion_queue.cc:659] Operation failed: tag=0x7f6e5cd1caf8, error={"created":"@1515520782.451034300","description":"Timed out waiting for connection state change","file":"src/core/ext/filters/client_channel/channel_connectivity.cc","file_line":133}
...(last two messages keep repeating 5 times every second)
服务器:
E0109 17:59:42.248201000 1985 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520782.248170000","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
E0109 17:59:42.248541100 1975 tcp_server_posix.cc:231] Failed accept4: Invalid argument
E0109 17:59:47.362868700 1994 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520787.362853500","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
E0109 17:59:52.430612500 2000 completion_queue.cc:659] Operation failed: tag=0x7f3f74febee8, error={"created":"@1515520792.430598800","description":"Server Shutdown","file":"src/core/lib/surface/server.cc","file_line":1249}
... (last message kept repeating every few seconds)
UPD2:
我的 Server.py
文件的全部内容:
import ModelSelectionModule
import my_api_pb2_grpc
import my_api_pb2
import grpc
from concurrent import futures
import time
class MyApiServicer(my_api_pb2_grpc.MyApiServicer):
def SelectModelForDataset(self, request, context):
print("Processing started.")
selectedModel = ModelSelectionModule.run(request, context)
print("Processing Completed.")
return selectedModel
# TODO(shalamov): what is the best way to run a python server?
def serve():
server = grpc.server(futures.ThreadPoolExecutor(max_workers=100))
my_api_pb2_grpc.add_MyApiServicer_to_server(MyApiServicer(), server)
server.add_insecure_port('[::]:50051')
server.start()
print("gRPC server started\n")
try:
while True:
time.sleep(24 * 60 * 60) # run for 24h
except KeyboardInterrupt:
server.stop(0)
if __name__ == '__main__':
serve()
UPD3:
似乎 ModelSelectionModule.run
导致了问题。我试图将它隔离到一个单独的线程中,但没有帮助。 selectedModel
最终被计算出来,但是那个时候客户端已经走了。 如何防止此调用干扰 grpc?
pool = ThreadPool(processes=1)
async_result = pool.apply_async(ModelSelectionModule.run(request, context))
selectedModel = async_result.get()
调用相当复杂,它产生并加入许多线程,调用不同的库,如 scikit-learn
和 smac
等。如果我post全部放在这里就太多了。
调试时,我发现在客户端请求后,服务器保持2个连接打开(fd 3
和fd 8
)。如果我手动关闭 fd 8
或向其写入一些字节,我在客户端中看到的错误将变为 Stream removed
(而不是 Received RST_STREAM with error code 2
)。似乎套接字 (fd 8
) 以某种方式被子进程破坏了。 怎么可能?如何保护套接字不被子进程访问?
这是在进程处理程序中使用 fork() 的结果。 gRPC Python 不支持此用例。
我遇到了这个问题,刚刚解决了,你用的是with_call()方法吗?
错误代码:
response = stub.SayHello.with_call(request=request, metadata=metadata)
响应是一个元组。
成功代码:不要使用with_call()
response = stub.SayHello(request=request, metadata=metadata)
response 是一个响应对象。