如何使用 python 通过 grpc 响应迭代器更快地循环

how to loop faster through grpc response iterators with python

我正在使用 python 调用一个 grpc 服务,该服务响应大约一百万个迭代器对象。目前,我正在使用列表推导式从迭代器访问我需要的 1 属性:

stub = QueryStub(grpc_channel)
return [object.attribute_i_need for object in stub.ResponseMethod]

访问大约一百万个属性需要一段时间(大约 2-3 分钟)。有什么办法可以加快速度吗?有兴趣知道人们如何更快地处理这种情况。我也尝试过使用 list(stub.ResponseMethod)[*stub.ResponseMethod] 来更快地解包或检索对象,但是这些方法需要更长的时间,因为迭代器对象有很多我不需要的其他元数据和存储它们。

PS 我不一定需要将属性存储在内存中,更快地访问它们是我想要实现的目标

如果您还没有这样做,我建议您使用 for 循环遍历该对象。但有一点需要说明: 重要的是要意识到你放入循环中的所有内容都会在每次循环迭代中执行。他们优化循环的关键是最小化他们所做的事情。即使是看似很快的操作,如果重复多次,也会花费很长时间。执行一个需要 1 微秒的操作一百万次将需要 1 秒才能完成。

不要在循环内甚至在其起始条件下执行 len(list) 之类的东西。

例子

a = [i for i in range(1000000)]
length = len(a)
for i in a:
   print(i - length)

快很多
a = [i for i in range(1000000)]
for i in a:
   print(i - len(a))

您还可以使用 Loop Unrolling(https://en.wikipedia.org/wiki/Loop_unrolling) 等技术,这是一种循环转换技术,它试图以牺牲程序的二进制大小为代价来优化程序的执行速度,这是一种称为 space-time 权衡的方法。

使用 mapfilter 等函数代替显式 for 循环也可以提供一些性能改进。

根据 this documentation,我会说你需要尝试两件事:

  • 使用 asyncio API(如果尚未完成),方法如下:
async def run(stub: QueryStub) -> None:
    async for object in stub.ResponseMethod(empty_pb2.Empty()):
        print(object.attribute_i_need)

请注意,Empty() 只是因为我不知道您的 API 定义。

  • 第二个是通过以下操作尝试实验性功能 SingleThreadedUnaryStream(如果适用于您的情况):
with grpc.insecure_channel(target='localhost:50051', options=[(grpc.experimental.ChannelOptions.SingleThreadedUnaryStream, 1)]) as channel:

我试过的

我真的不知道它是否涵盖了您的用例(您可以给我更多相关信息,我会更新),但这是我尝试过的:

我的架构如下:

service TestService {
  rpc AMethod(google.protobuf.Empty) returns (stream Test) {} // stream is optional, I tried with both
}

message Test {
  repeated string message = 1;
  repeated string message2 = 2;
  repeated string message3 = 3;
  repeated string message4 = 4;
  repeated string message5 = 5;
  repeated string message6 = 6;
  repeated string message7 = 7;
  repeated string message8 = 8;
  repeated string message9 = 9;
  repeated string message10 = 10;
  repeated string message11 = 11;
}

在服务器端(使用 asyncio)我有

async def AMethod(self, request: empty_pb2.Empty, unused_context) -> AsyncIterable[Test]:
    test = Test()
    for i in range(10):
        test.message.append(randStr())
    # repeat append for every other field or not
    for i in range(1000000):
        yield test

其中 randStr 创建长度为 10000 的 运行dom 字符串(完全任意)。

在客户端(使用 SingleThreadedUnaryStream 和 asyncio)

async def run(stub: TesterStub) -> None:
    tests = stub.AMethod(empty_pb2.Empty())

    async for test in tests:
        print(test.message)

基准

注意:这可能因您的机器而异

对于只有一个 repeated field 填充的示例,我得到的平均值(运行 3 次)为 77 sec

对于所有要填写的字段,它真的很长,所以我尝试提供更小的字符串(长度为 10),但仍然需要太长时间。我认为 repeatedstream 的混合不是一个好主意。我也尝试过不使用 stream,我得到了 45 sec.

的平均值(运行 3 次)

我的结论

如果所有重复的字段都填满数据,这真的很慢,而这是 ok-ish 只有一个填满的情况。 但总的来说,我认为 asyncio 有帮助

此外,this documentation解释说Protocol Buffers are not designed to handle large messages,然而Protocol Buffers are great for handling individual messages within a large data set

我建议,如果我的架构正确,您重新考虑 API 设计,因为这似乎不是最优的。

但我可能没有正确理解架构。