如何实现异步 grpc python 服务器?
How to implement a async grpc python server?
我需要为每个 GRPC 请求调用一个 celery 任务,return 结果。
在默认的 GRPC 实现中,每个请求都在线程池的单独线程中处理。
在我的例子中,服务器应该每秒以批处理模式处理大约 400 个请求。所以由于批处理,一个请求可能需要等待 1 秒才能得到结果,这意味着线程池的大小必须大于 400 以避免阻塞。
这可以异步完成吗?
非常感谢。
class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer):
def ReportEvent(self, request, context):
res = tasks.add.delay(1,2)
result = res.get() ->here i have to block
return ss_pb2.GeneralReply(message='Hello, %s!' % result.message)
如果您对 res.get
的调用可以异步完成(如果它是用 async
关键字定义的),则可以异步完成。
While grpc.server
says it requires a futures.ThreadPoolExecutor
, it will actually work with any futures.Executor
that calls the behaviors submitted to it on some thread other than the one on which they were passed。如果你传递给 grpc.server
一个你实现的 futures.Executor
只用一个线程执行四百个(或更多)并发调用 EventReporting.ReportEvent
,你的服务器应该避免那种阻塞你描述的。
在我看来是很好的简单实现异步grpc服务器,就像基于aiohttp的http一样。
import asyncio
from concurrent import futures
import functools
import inspect
import threading
from grpc import _server
def _loop_mgr(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
# If we reach here, the loop was stopped.
# We should gather any remaining tasks and finish them.
pending = asyncio.Task.all_tasks(loop=loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending))
class AsyncioExecutor(futures.Executor):
def __init__(self, *, loop=None):
super().__init__()
self._shutdown = False
self._loop = loop or asyncio.get_event_loop()
self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,),
daemon=True)
self._thread.start()
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('Cannot schedule new futures after shutdown')
if not self._loop.is_running():
raise RuntimeError("Loop must be started before any function can "
"be submitted")
if inspect.iscoroutinefunction(fn):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
func = functools.partial(fn, *args, **kwargs)
return self._loop.run_in_executor(None, func)
def shutdown(self, wait=True):
self._loop.stop()
self._shutdown = True
if wait:
self._thread.join()
# --------------------------------------------------------------------------- #
async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _server._Context(rpc_event, state, request_deserializer)
try:
return await behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception calling application: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _take_response_from_response_iterator(rpc_event, state, response_iterator):
try:
return await response_iterator.__anext__(), True
except StopAsyncIteration:
return None, True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response, proceed = await _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_server._status(rpc_event, state, serialized_response)
async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
# Notice this calls the normal `_call_behavior` not the awaitable version.
response_iterator, proceed = _server._call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = await _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_server._status(rpc_event, state, None)
break
else:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
print(response)
if serialized_response is not None:
print("Serialized Correctly")
proceed = _server._send_response(rpc_event, state,
serialized_response)
if not proceed:
break
else:
break
else:
break
_server._unary_response_in_pool = _unary_response_in_pool
_server._stream_response_in_pool = _stream_response_in_pool
if __name__ == '__main__':
server = grpc.server(AsyncioExecutor())
# Add Servicer and Start Server Here
link 改为:
https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be
正如@Michael 在评论中指出的那样,从 1.32 版开始,gRPC 现在支持 asyncio,其 Python API. If you're using an earlier version, you can still use the asyncio API via the experimental API: from grpc.experimental import aio
. An asyncio hello world example 也已添加到 gRPC 存储库中。以下代码是示例服务器的副本:
import logging
import asyncio
from grpc import aio
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
async def serve():
server = aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
listen_addr = '[::]:50051'
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
有关如何实施客户端的信息,请参见my other answer。
我需要为每个 GRPC 请求调用一个 celery 任务,return 结果。 在默认的 GRPC 实现中,每个请求都在线程池的单独线程中处理。
在我的例子中,服务器应该每秒以批处理模式处理大约 400 个请求。所以由于批处理,一个请求可能需要等待 1 秒才能得到结果,这意味着线程池的大小必须大于 400 以避免阻塞。
这可以异步完成吗? 非常感谢。
class EventReporting(ss_pb2.BetaEventReportingServicer, ss_pb2.BetaDeviceMgtServicer):
def ReportEvent(self, request, context):
res = tasks.add.delay(1,2)
result = res.get() ->here i have to block
return ss_pb2.GeneralReply(message='Hello, %s!' % result.message)
如果您对 res.get
的调用可以异步完成(如果它是用 async
关键字定义的),则可以异步完成。
While grpc.server
says it requires a futures.ThreadPoolExecutor
, it will actually work with any futures.Executor
that calls the behaviors submitted to it on some thread other than the one on which they were passed。如果你传递给 grpc.server
一个你实现的 futures.Executor
只用一个线程执行四百个(或更多)并发调用 EventReporting.ReportEvent
,你的服务器应该避免那种阻塞你描述的。
在我看来是很好的简单实现异步grpc服务器,就像基于aiohttp的http一样。
import asyncio
from concurrent import futures
import functools
import inspect
import threading
from grpc import _server
def _loop_mgr(loop: asyncio.AbstractEventLoop):
asyncio.set_event_loop(loop)
loop.run_forever()
# If we reach here, the loop was stopped.
# We should gather any remaining tasks and finish them.
pending = asyncio.Task.all_tasks(loop=loop)
if pending:
loop.run_until_complete(asyncio.gather(*pending))
class AsyncioExecutor(futures.Executor):
def __init__(self, *, loop=None):
super().__init__()
self._shutdown = False
self._loop = loop or asyncio.get_event_loop()
self._thread = threading.Thread(target=_loop_mgr, args=(self._loop,),
daemon=True)
self._thread.start()
def submit(self, fn, *args, **kwargs):
if self._shutdown:
raise RuntimeError('Cannot schedule new futures after shutdown')
if not self._loop.is_running():
raise RuntimeError("Loop must be started before any function can "
"be submitted")
if inspect.iscoroutinefunction(fn):
coro = fn(*args, **kwargs)
return asyncio.run_coroutine_threadsafe(coro, self._loop)
else:
func = functools.partial(fn, *args, **kwargs)
return self._loop.run_in_executor(None, func)
def shutdown(self, wait=True):
self._loop.stop()
self._shutdown = True
if wait:
self._thread.join()
# --------------------------------------------------------------------------- #
async def _call_behavior(rpc_event, state, behavior, argument, request_deserializer):
context = _server._Context(rpc_event, state, request_deserializer)
try:
return await behavior(argument, context), True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception calling application: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _take_response_from_response_iterator(rpc_event, state, response_iterator):
try:
return await response_iterator.__anext__(), True
except StopAsyncIteration:
return None, True
except Exception as e: # pylint: disable=broad-except
with state.condition:
if e not in state.rpc_errors:
details = 'Exception iterating responses: {}'.format(e)
_server.logging.exception(details)
_server._abort(state, rpc_event.operation_call,
_server.cygrpc.StatusCode.unknown, _server._common.encode(details))
return None, False
async def _unary_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
response, proceed = await _call_behavior(rpc_event, state, behavior,
argument, request_deserializer)
if proceed:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
if serialized_response is not None:
_server._status(rpc_event, state, serialized_response)
async def _stream_response_in_pool(rpc_event, state, behavior, argument_thunk,
request_deserializer, response_serializer):
argument = argument_thunk()
if argument is not None:
# Notice this calls the normal `_call_behavior` not the awaitable version.
response_iterator, proceed = _server._call_behavior(
rpc_event, state, behavior, argument, request_deserializer)
if proceed:
while True:
response, proceed = await _take_response_from_response_iterator(
rpc_event, state, response_iterator)
if proceed:
if response is None:
_server._status(rpc_event, state, None)
break
else:
serialized_response = _server._serialize_response(
rpc_event, state, response, response_serializer)
print(response)
if serialized_response is not None:
print("Serialized Correctly")
proceed = _server._send_response(rpc_event, state,
serialized_response)
if not proceed:
break
else:
break
else:
break
_server._unary_response_in_pool = _unary_response_in_pool
_server._stream_response_in_pool = _stream_response_in_pool
if __name__ == '__main__':
server = grpc.server(AsyncioExecutor())
# Add Servicer and Start Server Here
link 改为:
https://gist.github.com/seglberg/0b4487b57b4fd425c56ad72aba9971be
正如@Michael 在评论中指出的那样,从 1.32 版开始,gRPC 现在支持 asyncio,其 Python API. If you're using an earlier version, you can still use the asyncio API via the experimental API: from grpc.experimental import aio
. An asyncio hello world example 也已添加到 gRPC 存储库中。以下代码是示例服务器的副本:
import logging
import asyncio
from grpc import aio
import helloworld_pb2
import helloworld_pb2_grpc
class Greeter(helloworld_pb2_grpc.GreeterServicer):
async def SayHello(self, request, context):
return helloworld_pb2.HelloReply(message='Hello, %s!' % request.name)
async def serve():
server = aio.server()
helloworld_pb2_grpc.add_GreeterServicer_to_server(Greeter(), server)
listen_addr = '[::]:50051'
server.add_insecure_port(listen_addr)
logging.info("Starting server on %s", listen_addr)
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
asyncio.run(serve())
有关如何实施客户端的信息,请参见my other answer。