如何在 python 线程池执行中为每 10 个完成列表添加时间延迟?

How to add time delay for every 10 lists of completion in python threadpool execution?

我有一个长度为 100 的列表。我 运行 它与线程池并发。我可以在执行函数中添加时间延迟,但我希望有一个代码在 10 个线程成功执行时自动休眠 X 秒。

import time
from concurrent.futures import ThreadPoolExecutor

user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]

def parse(user):
    return str(user) + "parsed!"

with ThreadPoolExecutor(max_workers=10) as exe:
   start = time.time()
   result = exe.map(parse,user_list)
   output = list(result)
   end = time.time()
   print('taken time' end-start)

我想在每 10 个成功的线程执行之间添加一个时间延迟。我希望我的问题很清楚,并且可以通过一些调度来解决

直接的方法是一次最多提交 10 个作业,然后在每个块之间休眠:

import itertools
import time
from concurrent.futures import ThreadPoolExecutor


# See 
def chunker(n, iterable):
    it = iter(iterable)
    while True:
        chunk = tuple(itertools.islice(it, n))
        if not chunk:
            return
        yield chunk


def parse(user):
    return f"{user} parsed!"


def main():
    user_list = list(range(100))
    with ThreadPoolExecutor(max_workers=10) as exe:
        for chunk in chunker(10, user_list):
            start = time.time()
            result = exe.map(parse, chunk)
            output = list(result)
            end = time.time()
            print(output, "taken time", end - start)
            time.sleep(1)


if __name__ == "__main__":
    main()

这会打印出例如

['0 parsed!', '1 parsed!', '2 parsed!', '3 parsed!', '4 parsed!', '5 parsed!', '6 parsed!', '7 parsed!', '8 parsed!', '9 parsed!'] taken time 0.0006809234619140625
['10 parsed!', '11 parsed!', '12 parsed!', '13 parsed!', '14 parsed!', '15 parsed!', '16 parsed!', '17 parsed!', '18 parsed!', '19 parsed!'] taken time 0.0008037090301513672
['20 parsed!', '21 parsed!', '22 parsed!', '23 parsed!', '24 parsed!', '25 parsed!', '26 parsed!', '27 parsed!', '28 parsed!', '29 parsed!'] taken time 0.0008540153503417969
...

编辑 tqdm 进度

要通过这种方法使用 tqdm 以便它在每个 parse 步骤中得到更新,您需要类似下面的内容(与上面相同的位替换为 ...) .

tqdm 不会更新屏幕,除非自上次更新以来已经过了足够的时间,因此随机休眠表示工作已完成。)

def parse(user, prog):
    time.sleep(random.uniform(.1, 1.3))  # Do work here...
    prog.update()  # Step the progress bar.
    return f"{user} parsed!"


def main():
    # ...
    with ThreadPoolExecutor(max_workers=10) as exe, tqdm.tqdm(total=len(user_list)) as prog:
        for chunk in chunker(10, user_list):
            # ...
            result = exe.map(parse, chunk, [prog] * len(chunk))
            # ...

我创建了一个全局变量,用于计算 parse() 函数具有 运行 的次数,每当它被 10(no_of_times_run % 10 == 0) 完全整除时,它就会休眠一定时间时间。

附上代码供您参考。

import time
from concurrent.futures import ThreadPoolExecutor

user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]

no_of_times_run = 0

def parse(user):
    global no_of_times_run

    no_of_times_run += 1
    if no_of_times_run % 10 == 0:
        time.sleep(1) # Every 10th time it sleeps for a certain time.

    return str(user) + "parsed!"

with ThreadPoolExecutor(max_workers=10) as exe:
   start = time.time()
   result = exe.map(parse,user_list)
   output = list(result)
   end = time.time()
   print('taken time', end-start)