如何为 ThreadPoolExecutor worker 分配序号
How to assign a sequential number to ThreadPoolExecutor worker
假设以下代码示例使用 ThreadPoolExecutor 作为执行程序上传文件列表。
def upload_segments(segment_upload_list):
def __upload(object_path_pair):
libera_resource.upload_file(*object_path_pair)
print("Segment uploaded!")
with ThreadPoolExecutor() as executor:
executor.map(__upload, segment_upload_list)
upload_segments(segment_upload_list)
在这种多线程场景中,如何为我在列表中的位置分配一个数字?我想最后显示这样的东西:
“片段 10/4310 已上传。”
我知道由于多线程的性质,这里的输出不能是连续的,但它至少会提供一个进度概览。我想计算我已经启动的线程数也可以在这里计算已经上传的段数
提前致谢
下面是如何使用我在 中建议的全局计数器的说明。通过让每个工作线程随机休眠一段时间来模拟每次上传。
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import threading
import random
count_lock = threading.Lock()
count = 0
def upload_file(segment):
global count
print(f'Uploading segment #{segment}...')
sleep(random.uniform(1, 5)) # Simulate variable-length upload.
with count_lock:
count += 1
print(f' {count} of {len(segment_upload_list)} uploaded.')
def upload_segments(segment_upload_list):
global count
count = 0
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(upload_file, segment_upload_list)
print('\nAll segments uploaded!')
segment_upload_list = list(range(1, 11))
upload_segments(segment_upload_list)
示例输出。请注意它如何立即启动 3 个上传任务线程,然后在每次完成时启动另一个。这是因为在创建 ThreadPoolExecutor
时指定了限制。如您所见,正如我在更早的 .
中所说的那样,三者中的哪一个执行该任务并不重要
Uploading segment #1...
Uploading segment #2...
Uploading segment #3...
1 of 10 uploaded.
Uploading segment #4...
2 of 10 uploaded.
Uploading segment #5...
3 of 10 uploaded.
Uploading segment #6...
4 of 10 uploaded.
Uploading segment #7...
5 of 10 uploaded.
Uploading segment #8...
6 of 10 uploaded.
Uploading segment #9...
7 of 10 uploaded.
Uploading segment #10...
8 of 10 uploaded.
9 of 10 uploaded.
10 of 10 uploaded.
All segments uploaded!
我的最终解决方案:
count_lock = threading.Lock()
count = 0
def __upload(object_path_pair):
global count
if count > 0:
print(f'Uploading segment #{count} to CDN.')
libera_resource.upload_file(*object_path_pair)
sleep(random.uniform(1, 5))
with count_lock:
if count > 0:
print(f' Segment {count} of {len(segment_upload_list)} uploaded successfully.')
count += 1
def upload_segments(segment_upload_list):
global count
count = 0
with ThreadPoolExecutor() as executor:
executor.map(__upload, segment_upload_list)
print('\n!!! ALL SEGMENTS UPLOADED !!!')
upload_segments(segment_upload_list)
假设以下代码示例使用 ThreadPoolExecutor 作为执行程序上传文件列表。
def upload_segments(segment_upload_list):
def __upload(object_path_pair):
libera_resource.upload_file(*object_path_pair)
print("Segment uploaded!")
with ThreadPoolExecutor() as executor:
executor.map(__upload, segment_upload_list)
upload_segments(segment_upload_list)
在这种多线程场景中,如何为我在列表中的位置分配一个数字?我想最后显示这样的东西:
“片段 10/4310 已上传。”
我知道由于多线程的性质,这里的输出不能是连续的,但它至少会提供一个进度概览。我想计算我已经启动的线程数也可以在这里计算已经上传的段数
提前致谢
下面是如何使用我在
from concurrent.futures import ThreadPoolExecutor
from time import sleep
import threading
import random
count_lock = threading.Lock()
count = 0
def upload_file(segment):
global count
print(f'Uploading segment #{segment}...')
sleep(random.uniform(1, 5)) # Simulate variable-length upload.
with count_lock:
count += 1
print(f' {count} of {len(segment_upload_list)} uploaded.')
def upload_segments(segment_upload_list):
global count
count = 0
with ThreadPoolExecutor(max_workers=3) as executor:
executor.map(upload_file, segment_upload_list)
print('\nAll segments uploaded!')
segment_upload_list = list(range(1, 11))
upload_segments(segment_upload_list)
示例输出。请注意它如何立即启动 3 个上传任务线程,然后在每次完成时启动另一个。这是因为在创建 ThreadPoolExecutor
时指定了限制。如您所见,正如我在更早的
Uploading segment #1...
Uploading segment #2...
Uploading segment #3...
1 of 10 uploaded.
Uploading segment #4...
2 of 10 uploaded.
Uploading segment #5...
3 of 10 uploaded.
Uploading segment #6...
4 of 10 uploaded.
Uploading segment #7...
5 of 10 uploaded.
Uploading segment #8...
6 of 10 uploaded.
Uploading segment #9...
7 of 10 uploaded.
Uploading segment #10...
8 of 10 uploaded.
9 of 10 uploaded.
10 of 10 uploaded.
All segments uploaded!
我的最终解决方案:
count_lock = threading.Lock()
count = 0
def __upload(object_path_pair):
global count
if count > 0:
print(f'Uploading segment #{count} to CDN.')
libera_resource.upload_file(*object_path_pair)
sleep(random.uniform(1, 5))
with count_lock:
if count > 0:
print(f' Segment {count} of {len(segment_upload_list)} uploaded successfully.')
count += 1
def upload_segments(segment_upload_list):
global count
count = 0
with ThreadPoolExecutor() as executor:
executor.map(__upload, segment_upload_list)
print('\n!!! ALL SEGMENTS UPLOADED !!!')
upload_segments(segment_upload_list)