我如何使用 Python gRPC 处理流式消息
How do i handle streaming messages with Python gRPC
我正在关注这个 Route_Guide sample。
有问题的示例在不回复特定消息的情况下启动并阅读消息。后者是我想要实现的目标。
这是我目前的情况:
import grpc
...
channel = grpc.insecure_channel(conn_str)
try:
grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = MyService_pb2_grpc.MyServiceStub(channel)
print('Connected to gRPC server.')
this_is_just_read_maybe(stub)
def this_is_just_read_maybe(stub):
responses = stub.MyEventStream(stream())
for response in responses:
print(f'Received message: {response}')
if response.something:
# okay, now what? how do i send a message here?
def stream():
yield my_start_stream_msg
# this is fine, i receive this server-side
# but i can't check for incoming messages here
存根上似乎没有read()
或write()
,一切似乎都是用迭代器实现的。
如何从 this_is_just_read_maybe(stub)
发送消息?
这是正确的方法吗?
我的原型是双向流:
service MyService {
rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}
您尝试做的事情是完全可能的,并且可能涉及编写您自己的请求 iterator 对象,该对象可以在响应到达时给出响应,而不是使用简单的生成器作为您的请求迭代器。也许像
class MySmarterRequestIterator(object):
def __init__(self):
self._lock = threading.Lock()
self._responses_so_far = []
def __iter__(self):
return self
def _next(self):
# some logic that depends upon what responses have been seen
# before returning the next request message
return <your message value>
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
def add_response(self, response):
with self._lock:
self._responses.append(response)
你然后像这样使用
my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
my_smarter_request_iterator.add_response(response)
。在你的 _next
实现中可能会有锁定和阻塞来处理 gRPC Python 询问你的对象它想要发送的下一个请求和你的响应(实际上)的情况 "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out" .
除了编写自定义迭代器,您还可以使用阻塞队列为客户端存根实现类似发送和接收的行为:
import queue
...
send_queue = queue.SimpleQueue() # or Queue if using Python before 3.7
my_event_stream = stub.MyEventStream(iter(send_queue.get, None))
# send
send_queue.push(StreamingMessage())
# receive
response = next(my_event_stream) # type: StreamingMessage
这利用了 iter
的哨兵形式,它将常规函数转换为迭代器,迭代器在达到哨兵值时停止(在本例中为 None
)。
我正在关注这个 Route_Guide sample。
有问题的示例在不回复特定消息的情况下启动并阅读消息。后者是我想要实现的目标。
这是我目前的情况:
import grpc
...
channel = grpc.insecure_channel(conn_str)
try:
grpc.channel_ready_future(channel).result(timeout=5)
except grpc.FutureTimeoutError:
sys.exit('Error connecting to server')
else:
stub = MyService_pb2_grpc.MyServiceStub(channel)
print('Connected to gRPC server.')
this_is_just_read_maybe(stub)
def this_is_just_read_maybe(stub):
responses = stub.MyEventStream(stream())
for response in responses:
print(f'Received message: {response}')
if response.something:
# okay, now what? how do i send a message here?
def stream():
yield my_start_stream_msg
# this is fine, i receive this server-side
# but i can't check for incoming messages here
存根上似乎没有read()
或write()
,一切似乎都是用迭代器实现的。
如何从 this_is_just_read_maybe(stub)
发送消息?
这是正确的方法吗?
我的原型是双向流:
service MyService {
rpc MyEventStream (stream StreamingMessage) returns (stream StreamingMessage) {}
}
您尝试做的事情是完全可能的,并且可能涉及编写您自己的请求 iterator 对象,该对象可以在响应到达时给出响应,而不是使用简单的生成器作为您的请求迭代器。也许像
class MySmarterRequestIterator(object):
def __init__(self):
self._lock = threading.Lock()
self._responses_so_far = []
def __iter__(self):
return self
def _next(self):
# some logic that depends upon what responses have been seen
# before returning the next request message
return <your message value>
def __next__(self): # Python 3
return self._next()
def next(self): # Python 2
return self._next()
def add_response(self, response):
with self._lock:
self._responses.append(response)
你然后像这样使用
my_smarter_request_iterator = MySmarterRequestIterator()
responses = stub.MyEventStream(my_smarter_request_iterator)
for response in responses:
my_smarter_request_iterator.add_response(response)
。在你的 _next
实现中可能会有锁定和阻塞来处理 gRPC Python 询问你的对象它想要发送的下一个请求和你的响应(实际上)的情况 "wait, hold on, I don't know what request I want to send until after I've seen how the next response turned out" .
除了编写自定义迭代器,您还可以使用阻塞队列为客户端存根实现类似发送和接收的行为:
import queue
...
send_queue = queue.SimpleQueue() # or Queue if using Python before 3.7
my_event_stream = stub.MyEventStream(iter(send_queue.get, None))
# send
send_queue.push(StreamingMessage())
# receive
response = next(my_event_stream) # type: StreamingMessage
这利用了 iter
的哨兵形式,它将常规函数转换为迭代器,迭代器在达到哨兵值时停止(在本例中为 None
)。