如何在 python 线程池执行中为每 10 个完成列表添加时间延迟?
How to add time delay for every 10 lists of completion in python threadpool execution?
我有一个长度为 100 的列表。我 运行 它与线程池并发。我可以在执行函数中添加时间延迟,但我希望有一个代码在 10 个线程成功执行时自动休眠 X 秒。
import time
from concurrent.futures import ThreadPoolExecutor
user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]
def parse(user):
return str(user) + "parsed!"
with ThreadPoolExecutor(max_workers=10) as exe:
start = time.time()
result = exe.map(parse,user_list)
output = list(result)
end = time.time()
print('taken time' end-start)
我想在每 10 个成功的线程执行之间添加一个时间延迟。我希望我的问题很清楚,并且可以通过一些调度来解决
直接的方法是一次最多提交 10 个作业,然后在每个块之间休眠:
import itertools
import time
from concurrent.futures import ThreadPoolExecutor
# See
def chunker(n, iterable):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk
def parse(user):
return f"{user} parsed!"
def main():
user_list = list(range(100))
with ThreadPoolExecutor(max_workers=10) as exe:
for chunk in chunker(10, user_list):
start = time.time()
result = exe.map(parse, chunk)
output = list(result)
end = time.time()
print(output, "taken time", end - start)
time.sleep(1)
if __name__ == "__main__":
main()
这会打印出例如
['0 parsed!', '1 parsed!', '2 parsed!', '3 parsed!', '4 parsed!', '5 parsed!', '6 parsed!', '7 parsed!', '8 parsed!', '9 parsed!'] taken time 0.0006809234619140625
['10 parsed!', '11 parsed!', '12 parsed!', '13 parsed!', '14 parsed!', '15 parsed!', '16 parsed!', '17 parsed!', '18 parsed!', '19 parsed!'] taken time 0.0008037090301513672
['20 parsed!', '21 parsed!', '22 parsed!', '23 parsed!', '24 parsed!', '25 parsed!', '26 parsed!', '27 parsed!', '28 parsed!', '29 parsed!'] taken time 0.0008540153503417969
...
编辑 tqdm 进度
要通过这种方法使用 tqdm
以便它在每个 parse
步骤中得到更新,您需要类似下面的内容(与上面相同的位替换为 ...
) .
(tqdm
不会更新屏幕,除非自上次更新以来已经过了足够的时间,因此随机休眠表示工作已完成。)
def parse(user, prog):
time.sleep(random.uniform(.1, 1.3)) # Do work here...
prog.update() # Step the progress bar.
return f"{user} parsed!"
def main():
# ...
with ThreadPoolExecutor(max_workers=10) as exe, tqdm.tqdm(total=len(user_list)) as prog:
for chunk in chunker(10, user_list):
# ...
result = exe.map(parse, chunk, [prog] * len(chunk))
# ...
我创建了一个全局变量,用于计算 parse()
函数具有 运行 的次数,每当它被 10(no_of_times_run % 10 == 0
) 完全整除时,它就会休眠一定时间时间。
附上代码供您参考。
import time
from concurrent.futures import ThreadPoolExecutor
user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]
no_of_times_run = 0
def parse(user):
global no_of_times_run
no_of_times_run += 1
if no_of_times_run % 10 == 0:
time.sleep(1) # Every 10th time it sleeps for a certain time.
return str(user) + "parsed!"
with ThreadPoolExecutor(max_workers=10) as exe:
start = time.time()
result = exe.map(parse,user_list)
output = list(result)
end = time.time()
print('taken time', end-start)
我有一个长度为 100 的列表。我 运行 它与线程池并发。我可以在执行函数中添加时间延迟,但我希望有一个代码在 10 个线程成功执行时自动休眠 X 秒。
import time
from concurrent.futures import ThreadPoolExecutor
user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]
def parse(user):
return str(user) + "parsed!"
with ThreadPoolExecutor(max_workers=10) as exe:
start = time.time()
result = exe.map(parse,user_list)
output = list(result)
end = time.time()
print('taken time' end-start)
我想在每 10 个成功的线程执行之间添加一个时间延迟。我希望我的问题很清楚,并且可以通过一些调度来解决
直接的方法是一次最多提交 10 个作业,然后在每个块之间休眠:
import itertools
import time
from concurrent.futures import ThreadPoolExecutor
# See
def chunker(n, iterable):
it = iter(iterable)
while True:
chunk = tuple(itertools.islice(it, n))
if not chunk:
return
yield chunk
def parse(user):
return f"{user} parsed!"
def main():
user_list = list(range(100))
with ThreadPoolExecutor(max_workers=10) as exe:
for chunk in chunker(10, user_list):
start = time.time()
result = exe.map(parse, chunk)
output = list(result)
end = time.time()
print(output, "taken time", end - start)
time.sleep(1)
if __name__ == "__main__":
main()
这会打印出例如
['0 parsed!', '1 parsed!', '2 parsed!', '3 parsed!', '4 parsed!', '5 parsed!', '6 parsed!', '7 parsed!', '8 parsed!', '9 parsed!'] taken time 0.0006809234619140625
['10 parsed!', '11 parsed!', '12 parsed!', '13 parsed!', '14 parsed!', '15 parsed!', '16 parsed!', '17 parsed!', '18 parsed!', '19 parsed!'] taken time 0.0008037090301513672
['20 parsed!', '21 parsed!', '22 parsed!', '23 parsed!', '24 parsed!', '25 parsed!', '26 parsed!', '27 parsed!', '28 parsed!', '29 parsed!'] taken time 0.0008540153503417969
...
编辑 tqdm 进度
要通过这种方法使用 tqdm
以便它在每个 parse
步骤中得到更新,您需要类似下面的内容(与上面相同的位替换为 ...
) .
(tqdm
不会更新屏幕,除非自上次更新以来已经过了足够的时间,因此随机休眠表示工作已完成。)
def parse(user, prog):
time.sleep(random.uniform(.1, 1.3)) # Do work here...
prog.update() # Step the progress bar.
return f"{user} parsed!"
def main():
# ...
with ThreadPoolExecutor(max_workers=10) as exe, tqdm.tqdm(total=len(user_list)) as prog:
for chunk in chunker(10, user_list):
# ...
result = exe.map(parse, chunk, [prog] * len(chunk))
# ...
我创建了一个全局变量,用于计算 parse()
函数具有 运行 的次数,每当它被 10(no_of_times_run % 10 == 0
) 完全整除时,它就会休眠一定时间时间。
附上代码供您参考。
import time
from concurrent.futures import ThreadPoolExecutor
user_list = [1,2,3,4,5,6,7,8,9,10,11,12,13,...,100]
no_of_times_run = 0
def parse(user):
global no_of_times_run
no_of_times_run += 1
if no_of_times_run % 10 == 0:
time.sleep(1) # Every 10th time it sleeps for a certain time.
return str(user) + "parsed!"
with ThreadPoolExecutor(max_workers=10) as exe:
start = time.time()
result = exe.map(parse,user_list)
output = list(result)
end = time.time()
print('taken time', end-start)