Python 三重奏设置十进制工数
Python Trio set up a decimal number of workers
我正在与 trio to 运行 异步并发任务一起工作,这些任务将在不同的网站上进行一些网络抓取。我希望能够选择将任务分配给多少个并发工作人员。为此,我编写了这段代码
async def run_task():
s = trio.Session(connections=5)
Total_to_check = to_check() / int(module().workers)
line = 0
if int(Total_to_check) < 1:
Total_to_check = 1
module().workers = int(to_check())
for i in range(int(Total_to_check)):
try:
async with trio.open_nursery() as nursery:
for x in range(int(module().workers)):
nursery.start_soon(python_worker, self, s, x, line)
line += 1
except BlockingIOError as e:
print("[Fatal Error]", str(e))
continue
在此示例中,to_check()
等于提供了多少个 URL 以从中获取数据,module().workers
等于我想使用多少个并发工作人员。
因此,如果我假设我有 30 个 url,并且输入我想要 10 个并发任务,它会同时从 10 个 url 获取数据并重复该过程 3 次。
现在这一切都很好,直到 Total_to_check
(等于 url 的数量除以工人的数量)是小数。
如果我有 15 个 url,我要求 10 个工作人员,那么这段代码将只检查 10 个 url。如果我有 20 个 url 但要求 15 个工人,情况也是如此。
我可以做类似 math.ceil(Total_to_check) 的事情,但随后它会开始尝试检查不存在的网址。
我怎样才能让它正常工作,所以如果我有 10 个并发任务和 15 个 url,它会同时检查前 10 个,然后同时检查最后 5 个而不跳过 url? (或尝试检查太多)
谢谢!
好吧,CapacityLimiter 来了,你会像这样使用:
async def python_worker(self, session, workers, line, limit):
async with limit:
...
然后你可以简化你的 run_task
:
async def run_task():
limit = trio.CapacityLimiter(10)
s = trio.Session(connections=5)
line = 0
async with trio.open_nursery() as nursery:
for x in range(int(to_check())):
nursery.start_soon(python_worker, self, s, x, line, limit)
line += 1
我相信 BlockingIOError
也必须移动到 python_worker
里面,因为 nursery.start_soon()
不会阻塞,是 nursery
的 __aexit__
自动 在 async with trio.open_nursery() as nursery
块的末尾等待。
我正在与 trio to 运行 异步并发任务一起工作,这些任务将在不同的网站上进行一些网络抓取。我希望能够选择将任务分配给多少个并发工作人员。为此,我编写了这段代码
async def run_task():
s = trio.Session(connections=5)
Total_to_check = to_check() / int(module().workers)
line = 0
if int(Total_to_check) < 1:
Total_to_check = 1
module().workers = int(to_check())
for i in range(int(Total_to_check)):
try:
async with trio.open_nursery() as nursery:
for x in range(int(module().workers)):
nursery.start_soon(python_worker, self, s, x, line)
line += 1
except BlockingIOError as e:
print("[Fatal Error]", str(e))
continue
在此示例中,to_check()
等于提供了多少个 URL 以从中获取数据,module().workers
等于我想使用多少个并发工作人员。
因此,如果我假设我有 30 个 url,并且输入我想要 10 个并发任务,它会同时从 10 个 url 获取数据并重复该过程 3 次。
现在这一切都很好,直到 Total_to_check
(等于 url 的数量除以工人的数量)是小数。
如果我有 15 个 url,我要求 10 个工作人员,那么这段代码将只检查 10 个 url。如果我有 20 个 url 但要求 15 个工人,情况也是如此。
我可以做类似 math.ceil(Total_to_check) 的事情,但随后它会开始尝试检查不存在的网址。
我怎样才能让它正常工作,所以如果我有 10 个并发任务和 15 个 url,它会同时检查前 10 个,然后同时检查最后 5 个而不跳过 url? (或尝试检查太多)
谢谢!
好吧,CapacityLimiter 来了,你会像这样使用:
async def python_worker(self, session, workers, line, limit):
async with limit:
...
然后你可以简化你的 run_task
:
async def run_task():
limit = trio.CapacityLimiter(10)
s = trio.Session(connections=5)
line = 0
async with trio.open_nursery() as nursery:
for x in range(int(to_check())):
nursery.start_soon(python_worker, self, s, x, line, limit)
line += 1
我相信 BlockingIOError
也必须移动到 python_worker
里面,因为 nursery.start_soon()
不会阻塞,是 nursery
的 __aexit__
自动 在 async with trio.open_nursery() as nursery
块的末尾等待。