使用ThreadPoolExecutor时如何保持原来的输入顺序?

How to keep the original order of input when using ThreadPoolExecutor?

from concurrent.futures import ThreadPoolExecutor, as_completed

def add_one(number, n):
    return number + 1 + n

def process():
    all_numbers = []
    for i in range(0, 10):
        all_numbers.append(i)

    threads = []
    all_results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for number in all_numbers:
            threads.append(executor.submit(add_one, number))

        for index, task in enumerate(as_completed(threads)):
            result = task.result()
            #print(result)
            all_results.append(result)

    for index, result in enumerate(all_results):
        print(result)

process()

如果我设置max_works=1,它会从1到10依次打印出来;如果我设置 max_workers = 10,顺序可能是随机的:

5
3
10
7
1
8
6
2
4
9

如本例中使用 ThreadPoolExecutor 处理项目列表时,如何保持输入的原始顺序?

可以使用ThreadExecutor的map方法:

from concurrent.futures import ThreadPoolExecutor, as_completed

def add_one(number):
    return number + 1

def process():
    all_numbers = []
    for i in range(0, 10):
        all_numbers.append(i)

    all_results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for i in executor.map(add_one, all_numbers):
            print(i)
            all_results.append(i)

    for index, result in enumerate(all_results):
        print(result)

process()

根据评论要求更新的答案:

from concurrent.futures import ThreadPoolExecutor, as_completed

def add_one(args):
    return args[0] + 1 + args[1]

def process():
    all_numbers = []
    for i in range(0, 10):
        all_numbers.append([i, 2])

    all_results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for i in executor.map(add_one, all_numbers):
            print(i)
            all_results.append(i)

    for index, result in enumerate(all_results):
        print(result)

process()

这混合了两个不相容的想法!

当您使用 thread/process/whatever 池时,工作将以任意顺序完成(主要是不相关系统负载的结果)。有些工作可能与其他工作同时发生(这通常是这种系统的好处;并行化)。但是,除非您不厌其烦地对结果进行排序,否则它们将按照池进行工作的任何顺序排列。

与其尝试“排序”结果,不如考虑将它们映射回某个集合,例如字典,这样您就可以按键读回它们(可能有一些顺序)。

根据@marlon 的要求,这是@rorra 解决方案的变体,它使用itertools.repeat 来降低一些参数传递的复杂性。

示例:

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import itertools

def add_one(number, n):
    return number + 1 + n

def process():
    all_numbers = list(range(0, 10))

    with ThreadPoolExecutor(max_workers=10) as executor:
        
        for result in executor.map(add_one, all_numbers, itertools.repeat(2)):
            print(result)

process()

输出:

3
4
5
6
7
8
9
10
11
12

这可能是获得所需结果的方法之一

from concurrent.futures import ThreadPoolExecutor, as_completed


def add_one(number, index):
    return number + 1, index


def process():
    all_numbers = []
    for i in range(0, 10):
        all_numbers.append(i)

    threads = []
    all_results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for index, number in enumerate(all_numbers):
            threads.append(executor.submit(add_one, number, index))
        for task in as_completed(threads):
            result, index = task.result()
            all_results.append([result, index])
        all_results = sorted(all_results, key=lambda x: x[-1])

    for index, result in enumerate(all_results):
        print(result[0])


process()

只需从您的代码中删除 as_completed

executor.submit 将 return Future 对象,然后按顺序将其附加到列表中。

但是 as_completed 将按完整顺序生成 Future 个对象。

from concurrent.futures import ThreadPoolExecutor, as_completed

def add_one(number, n):
    return number + 1 + n

def process():
    all_numbers = []
    for i in range(0, 10):
        all_numbers.append(i)

    threads = []
    all_results = []
    with ThreadPoolExecutor(max_workers=10) as executor:
        for number in all_numbers:
            threads.append(executor.submit(add_one, number))

        for index, task in enumerate(threads):
            result = task.result()
            #print(result)
            all_results.append(result)

    for index, result in enumerate(all_results):
        print(result)

process()