Python 三重奏设置十进制工数

Python Trio set up a decimal number of workers

我正在与 trio to 运行 异步并发任务一起工作,这些任务将在不同的网站上进行一些网络抓取。我希望能够选择将任务分配给多少个并发工作人员。为此,我编写了这段代码

async def run_task():
    s = trio.Session(connections=5)
    Total_to_check = to_check() / int(module().workers)
    line = 0
    if int(Total_to_check) < 1:
        Total_to_check = 1
        module().workers = int(to_check())
    for i in range(int(Total_to_check)):
        try:
            async with trio.open_nursery() as nursery:
                for x in range(int(module().workers)):                  
                        nursery.start_soon(python_worker, self, s, x, line)
                        line += 1
                            
    
        except BlockingIOError as e:
            print("[Fatal Error]", str(e))
            continue            

在此示例中,to_check() 等于提供了多少个 URL 以从中获取数据,module().workers 等于我想使用多少个并发工作人员。

因此,如果我假设我有 30 个 url,并且输入我想要 10 个并发任务,它会同时从 10 个 url 获取数据并重复该过程 3 次。

现在这一切都很好,直到 Total_to_check(等于 url 的数量除以工人的数量)是小数。 如果我有 15 个 url,我要求 10 个工作人员,那么这段代码将只检查 10 个 url。如果我有 20 个 url 但要求 15 个工人,情况也是如此。 我可以做类似 math.ceil(Total_to_check) 的事情,但随后它会开始尝试检查不存在的网址。

我怎样才能让它正常工作,所以如果我有 10 个并发任务和 15 个 url,它会同时检查前 10 个,然后同时检查最后 5 个而不跳过 url? (或尝试检查太多)

谢谢!

好吧,CapacityLimiter 来了,你会像这样使用:

async def python_worker(self, session, workers, line, limit):
    async with limit:
        ...

然后你可以简化你的 run_task:

async def run_task():
    limit = trio.CapacityLimiter(10)
    s = trio.Session(connections=5)
    line = 0
    async with trio.open_nursery() as nursery:
        for x in range(int(to_check())):
            nursery.start_soon(python_worker, self, s, x, line, limit)
            line += 1      

我相信 BlockingIOError 也必须移动到 python_worker 里面,因为 nursery.start_soon() 不会阻塞,是 nursery__aexit__ 自动async with trio.open_nursery() as nursery 块的末尾等待。