gRPC Python 服务器通过函数调用与客户端通信
gRPC Python server to client communication through function call
假设我正在使用 gRPC 构建一个简单的聊天应用程序 .proto
:
service Chat {
rpc SendChat(Message) returns (Void);
rpc SubscribeToChats(Void) returns (stream Message);
}
message Void {}
message Message {string text = 1;}
我经常看到 Python 中服务程序的实现方式(在示例中)是这样的:
class Servicer(ChatServicer):
def __init__(self):
self.messages = []
def SendChat(self, request, context):
self.messages.append(request.text)
return Void()
def SubscribeToChats(self, request, context):
while True:
if len(self.messages) > 0:
yield Message(text=self.messages.pop())
虽然这可行,但产生一个无限循环持续检查每个连接的客户端的条件似乎非常低效。最好使用这样的东西,在消息进入时立即触发发送,并且不需要对条件进行任何持续轮询:
class Servicer(ChatServicer):
def __init__(self):
self.listeners = []
def SendChat(self, request, context):
for listener in self.listeners:
listener(Message(request.text))
return Void()
def SubscribeToChats(self, request, context, callback):
self.listeners.append(callback)
但是,我似乎找不到使用 gRPC 执行此类操作的方法。
我有以下问题:
- 对于这种情况,无限循环效率低下,我说得对吗?还是在后台进行了我不知道的优化?
- 有没有什么有效的方法可以实现类似于我上面首选的解决方案?这似乎是一个相当常见的用例,所以我确定我遗漏了什么。
提前致谢!
我想出了一个有效的方法。关键是要用AsyncIO API。现在我的 SubscribeToChats
函数可以是一个异步生成器,这让事情变得容易多了。
现在我可以使用类似 asyncio 队列的东西,我的函数可以在 while 循环中等待它。类似这样:
class Servicer(ChatServicer):
def __init__(self):
self.queue = asyncio.Queue()
async def SendChat(self, request, context):
await self.queue.put(request.text)
return Void()
async def SubscribeToChats(self, request, context):
while True:
yield Message(text=await self.queue.get())
假设我正在使用 gRPC 构建一个简单的聊天应用程序 .proto
:
service Chat {
rpc SendChat(Message) returns (Void);
rpc SubscribeToChats(Void) returns (stream Message);
}
message Void {}
message Message {string text = 1;}
我经常看到 Python 中服务程序的实现方式(在示例中)是这样的:
class Servicer(ChatServicer):
def __init__(self):
self.messages = []
def SendChat(self, request, context):
self.messages.append(request.text)
return Void()
def SubscribeToChats(self, request, context):
while True:
if len(self.messages) > 0:
yield Message(text=self.messages.pop())
虽然这可行,但产生一个无限循环持续检查每个连接的客户端的条件似乎非常低效。最好使用这样的东西,在消息进入时立即触发发送,并且不需要对条件进行任何持续轮询:
class Servicer(ChatServicer):
def __init__(self):
self.listeners = []
def SendChat(self, request, context):
for listener in self.listeners:
listener(Message(request.text))
return Void()
def SubscribeToChats(self, request, context, callback):
self.listeners.append(callback)
但是,我似乎找不到使用 gRPC 执行此类操作的方法。
我有以下问题:
- 对于这种情况,无限循环效率低下,我说得对吗?还是在后台进行了我不知道的优化?
- 有没有什么有效的方法可以实现类似于我上面首选的解决方案?这似乎是一个相当常见的用例,所以我确定我遗漏了什么。
提前致谢!
我想出了一个有效的方法。关键是要用AsyncIO API。现在我的 SubscribeToChats
函数可以是一个异步生成器,这让事情变得容易多了。
现在我可以使用类似 asyncio 队列的东西,我的函数可以在 while 循环中等待它。类似这样:
class Servicer(ChatServicer):
def __init__(self):
self.queue = asyncio.Queue()
async def SendChat(self, request, context):
await self.queue.put(request.text)
return Void()
async def SubscribeToChats(self, request, context):
while True:
yield Message(text=await self.queue.get())