使用ThreadPoolExecutor时如何保持原来的输入顺序?
How to keep the original order of input when using ThreadPoolExecutor?
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for number in all_numbers:
threads.append(executor.submit(add_one, number))
for index, task in enumerate(as_completed(threads)):
result = task.result()
#print(result)
all_results.append(result)
for index, result in enumerate(all_results):
print(result)
process()
如果我设置max_works=1,它会从1到10依次打印出来;如果我设置 max_workers = 10,顺序可能是随机的:
5
3
10
7
1
8
6
2
4
9
如本例中使用 ThreadPoolExecutor 处理项目列表时,如何保持输入的原始顺序?
可以使用ThreadExecutor的map方法:
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number):
return number + 1
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for i in executor.map(add_one, all_numbers):
print(i)
all_results.append(i)
for index, result in enumerate(all_results):
print(result)
process()
根据评论要求更新的答案:
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(args):
return args[0] + 1 + args[1]
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append([i, 2])
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for i in executor.map(add_one, all_numbers):
print(i)
all_results.append(i)
for index, result in enumerate(all_results):
print(result)
process()
这混合了两个不相容的想法!
当您使用 thread/process/whatever 池时,工作将以任意顺序完成(主要是不相关系统负载的结果)。有些工作可能与其他工作同时发生(这通常是这种系统的好处;并行化)。但是,除非您不厌其烦地对结果进行排序,否则它们将按照池进行工作的任何顺序排列。
与其尝试“排序”结果,不如考虑将它们映射回某个集合,例如字典,这样您就可以按键读回它们(可能有一些顺序)。
根据@marlon 的要求,这是@rorra 解决方案的变体,它使用itertools.repeat
来降低一些参数传递的复杂性。
示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import itertools
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = list(range(0, 10))
with ThreadPoolExecutor(max_workers=10) as executor:
for result in executor.map(add_one, all_numbers, itertools.repeat(2)):
print(result)
process()
输出:
3
4
5
6
7
8
9
10
11
12
这可能是获得所需结果的方法之一
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, index):
return number + 1, index
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for index, number in enumerate(all_numbers):
threads.append(executor.submit(add_one, number, index))
for task in as_completed(threads):
result, index = task.result()
all_results.append([result, index])
all_results = sorted(all_results, key=lambda x: x[-1])
for index, result in enumerate(all_results):
print(result[0])
process()
只需从您的代码中删除 as_completed
!
executor.submit
将 return Future
对象,然后按顺序将其附加到列表中。
但是 as_completed
将按完整顺序生成 Future
个对象。
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for number in all_numbers:
threads.append(executor.submit(add_one, number))
for index, task in enumerate(threads):
result = task.result()
#print(result)
all_results.append(result)
for index, result in enumerate(all_results):
print(result)
process()
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for number in all_numbers:
threads.append(executor.submit(add_one, number))
for index, task in enumerate(as_completed(threads)):
result = task.result()
#print(result)
all_results.append(result)
for index, result in enumerate(all_results):
print(result)
process()
如果我设置max_works=1,它会从1到10依次打印出来;如果我设置 max_workers = 10,顺序可能是随机的:
5
3
10
7
1
8
6
2
4
9
如本例中使用 ThreadPoolExecutor 处理项目列表时,如何保持输入的原始顺序?
可以使用ThreadExecutor的map方法:
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number):
return number + 1
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for i in executor.map(add_one, all_numbers):
print(i)
all_results.append(i)
for index, result in enumerate(all_results):
print(result)
process()
根据评论要求更新的答案:
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(args):
return args[0] + 1 + args[1]
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append([i, 2])
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for i in executor.map(add_one, all_numbers):
print(i)
all_results.append(i)
for index, result in enumerate(all_results):
print(result)
process()
这混合了两个不相容的想法!
当您使用 thread/process/whatever 池时,工作将以任意顺序完成(主要是不相关系统负载的结果)。有些工作可能与其他工作同时发生(这通常是这种系统的好处;并行化)。但是,除非您不厌其烦地对结果进行排序,否则它们将按照池进行工作的任何顺序排列。
与其尝试“排序”结果,不如考虑将它们映射回某个集合,例如字典,这样您就可以按键读回它们(可能有一些顺序)。
根据@marlon 的要求,这是@rorra 解决方案的变体,它使用itertools.repeat
来降低一些参数传递的复杂性。
示例:
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import itertools
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = list(range(0, 10))
with ThreadPoolExecutor(max_workers=10) as executor:
for result in executor.map(add_one, all_numbers, itertools.repeat(2)):
print(result)
process()
输出:
3
4
5
6
7
8
9
10
11
12
这可能是获得所需结果的方法之一
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, index):
return number + 1, index
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for index, number in enumerate(all_numbers):
threads.append(executor.submit(add_one, number, index))
for task in as_completed(threads):
result, index = task.result()
all_results.append([result, index])
all_results = sorted(all_results, key=lambda x: x[-1])
for index, result in enumerate(all_results):
print(result[0])
process()
只需从您的代码中删除 as_completed
!
executor.submit
将 return Future
对象,然后按顺序将其附加到列表中。
但是 as_completed
将按完整顺序生成 Future
个对象。
from concurrent.futures import ThreadPoolExecutor, as_completed
def add_one(number, n):
return number + 1 + n
def process():
all_numbers = []
for i in range(0, 10):
all_numbers.append(i)
threads = []
all_results = []
with ThreadPoolExecutor(max_workers=10) as executor:
for number in all_numbers:
threads.append(executor.submit(add_one, number))
for index, task in enumerate(threads):
result = task.result()
#print(result)
all_results.append(result)
for index, result in enumerate(all_results):
print(result)
process()