如何为 ThreadPoolExecutor worker 分配序号

How to assign a sequential number to ThreadPoolExecutor worker

假设以下代码示例使用 ThreadPoolExecutor 作为执行程序上传文件列表。

    def upload_segments(segment_upload_list):
        def __upload(object_path_pair):
            libera_resource.upload_file(*object_path_pair)
            print("Segment uploaded!")

        with ThreadPoolExecutor() as executor:
            executor.map(__upload, segment_upload_list)
    upload_segments(segment_upload_list)

在这种多线程场景中,如何为我在列表中的位置分配一个数字?我想最后显示这样的东西:

“片段 10/4310 已上传。”

我知道由于多线程的性质,这里的输出不能是连续的,但它至少会提供一个进度概览。我想计算我已经启动的线程数也可以在这里计算已经上传的段数

提前致谢

下面是如何使用我在 中建议的全局计数器的说明。通过让每个工作线程随机休眠一段时间来模拟每次上传。

from concurrent.futures import ThreadPoolExecutor
from time import sleep
import threading
import random


count_lock = threading.Lock()
count = 0

def upload_file(segment):
    global count

    print(f'Uploading segment #{segment}...')
    sleep(random.uniform(1, 5))  # Simulate variable-length upload.
    with count_lock:
        count += 1
        print(f'  {count} of {len(segment_upload_list)} uploaded.')


def upload_segments(segment_upload_list):
    global count

    count = 0
    with ThreadPoolExecutor(max_workers=3) as executor:
        executor.map(upload_file, segment_upload_list)

    print('\nAll segments uploaded!')


segment_upload_list = list(range(1, 11))
upload_segments(segment_upload_list)

示例输出。请注意它如何立即启动 3 个上传任务线程,然后在每次完成时启动另一个。这是因为在创建 ThreadPoolExecutor 时指定了限制。如您所见,正如我在更早的 .

中所说的那样,三者中的哪一个执行该任务并不重要
Uploading segment #1...
Uploading segment #2...
Uploading segment #3...
  1 of 10 uploaded.
Uploading segment #4...
  2 of 10 uploaded.
Uploading segment #5...
  3 of 10 uploaded.
Uploading segment #6...
  4 of 10 uploaded.
Uploading segment #7...
  5 of 10 uploaded.
Uploading segment #8...
  6 of 10 uploaded.
Uploading segment #9...
  7 of 10 uploaded.
Uploading segment #10...
  8 of 10 uploaded.
  9 of 10 uploaded.
  10 of 10 uploaded.

All segments uploaded!

我的最终解决方案:

        count_lock = threading.Lock()
        count = 0

        def __upload(object_path_pair):
            global count
            if count > 0:
                print(f'Uploading segment #{count} to CDN.')
            libera_resource.upload_file(*object_path_pair)
            sleep(random.uniform(1, 5))
            with count_lock:
                if count > 0:
                    print(f' Segment {count} of {len(segment_upload_list)} uploaded successfully.')
                count += 1

        def upload_segments(segment_upload_list):
            global count

            count = 0
            with ThreadPoolExecutor() as executor:
                executor.map(__upload, segment_upload_list)
            print('\n!!! ALL SEGMENTS UPLOADED !!!')

        upload_segments(segment_upload_list)