Python tqdm process_map: 添加进程之间共享的列表?
Python tqdm process_map: Append list shared between processes?
我想分享一个列表以附加来自 tqdm
的 process_map
并行线程的输出。 (我想使用 process_map
的原因是漂亮的进度指示器和 max_workers=
选项。)
我曾尝试使用 from multiprocessing import Manager
创建共享列表,但我在这里做错了:我的代码打印了一个空的 shared_list
,但它应该打印一个包含 20 个数字的列表,正确的顺序并不重要。
任何帮助将不胜感激,提前致谢!
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
shared_list = []
def worker(i):
global shared_list
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(worker, range(20), max_workers=5)
print(shared_list)
您没有指定您 运行 在哪个平台下(您应该在使用 multiprocessing
标记问题时使用您的平台标记您的问题),但看起来您是 运行 在使用 spawn
创建新进程的平台下(例如 Windows)。这意味着当启动一个新进程时,会创建一个空地址 space,启动一个新的 Python 解释器并从顶部重新执行源代码。
因此,尽管您在 if __name__ == '__main__':
开始的块中分配给 shared_list
托管列表,但创建的池中的每个进程都将执行 shared_list = []
破坏您的初始分配.
您可以将 shared_list
作为第一个参数传递给辅助函数:
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
from functools import partial
def worker(shared_list, i):
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(partial(worker, shared_list), range(20), max_workers=5)
print(shared_list)
If process_map
以与 ProcessPoolExecutor
相同的方式支持 initializer 和 initargs 参数 class 有(好像没有),那么你可以这样做:
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
def init_pool(the_list):
global shared_list
shared_list = the_list
def worker(i):
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(worker, range(20), max_workers=5, initializer=init_pool, initargs=(shared_list,))
print(shared_list)
评论
这与您的原始问题本身无关,但对于此类问题,您可能需要考虑使用而不是您的工作人员函数的托管列表(巧合地命名为 worker
) 追加元素并且追加元素的顺序是不确定的,因为您无法控制池进程的调度,multiprocessing.Array
实例初始化如下:
shared_list = Array('i', [0] * 20, lock=False)
然后你的 worker 函数变成:
def worker(i):
time.sleep(1)
shared_list[i] = i
此处数组存储在共享内存中,甚至不需要锁定访问,因为 worker
的每次调用都在访问数组的不同索引。访问共享内存数组的元素比访问托管列表的元素快多。唯一的问题是对共享内存的引用不能作为参数传递,我们看到 process_map
不支持 initializer 和 initargs争论。因此,您将不得不使用较低级别的方法。例如:
import time
from multiprocessing import Pool, Array
from tqdm import tqdm
def init_pool(the_list):
global shared_list
shared_list = the_list
def worker(i):
time.sleep(1)
shared_list[i] = i
if __name__ == '__main__':
# Preallocate 20 slots for the array in shared memory
# And we don't require a lock if each worker invocation is accessing a different Array index:
args = range(20)
shared_list = Array('i', [0] * len(args), lock=False)
with tqdm(total=len(args)) as pbar:
pool = Pool(5, initializer=init_pool, initargs=(shared_list,))
for result in pool.imap_unordered(worker, args):
pbar.update(1)
# print out elements one at a time:
for elem in shared_list:
print(elem)
# print out all elements at once (must first convert to a regular list):
print(list(shared_list))
评论2
我会避免使用 process_map
。该函数基于ProcessPoolExecutor.map
方法的map
方法,需要return结果对应iterable[=104=的元素的顺序] 正在通过,不是 的完成顺序。想象一下,如果由于某种原因正在处理第一个提交的任务的进程(在我们的例子中 i
值为 0)需要很长时间来处理并最终成为完成的最后一个任务,会发生什么情况。在第一个提交的任务完成之前,您会看到 tqdm
进度条在很长一段时间内什么都不做。但是当这种情况发生时,我们知道所有其他提交的任务都已经完成,因此进度条会立即从 0 跳到 100%。如下修改函数 worker
以查看实际效果:
def worker(shared_list, i):
if i == 0:
time.sleep(5)
else:
time.sleep(.25)
shared_list.append(i)
我上面提供的使用 Pool.imap_unordered
的代码版本允许 returns 结果乱序并且默认 chunksize 值为 1,它将按完成顺序排列。进度条会更流畅地进行。
评论3
tqdm
中似乎也存在错误。下面的程序演示了这次如何将低级 tqdm
调用与 concurrent.futures
模块一起使用。不幸的是,它的 ProcessPoolExecutor
class(用于多处理)和 ThreadPoolExecutor
class(用于多线程)没有等同于 imap_unordered
方法。您必须使用 submit
方法(其 multiprocessing.pool.Pool
模拟方法是 apply_async
方法),return 是一个 Future
实例,您可以在该实例上调用 result
方法来阻止完成并 return 提交任务的结果)。您将 submit
一堆任务并将 returned Future
实例存储在列表中,然后使用 as_completed
函数调用从 returned列出下一个已完成的 Future
实例。
本演示使用线程并创建一个大小为20 的线程池并提交20 个任务,因此所有任务应同时启动。 worker1
的休眠时间设置为不同,因此 i
参数的值越小,休眠时间越长。该程序创建池并提交任务 4 次。第一次,只打印 return 值。第二次使用 tqdm
进度条。结果如您所料。第三次worker2
配合tqdm
进度条使用。不同之处在于,对于 i != 0
的所有值,睡眠时间都是一个常数(.25 秒),因此对于 i
值 1、2、... 19,任务应大致在同时。因此,您会希望看到进度条在很短的时间内跳到 95%,然后等待 i == 0
任务完成。然而,您观察到的情况恰恰相反。进度条转到 5% 并在那里停留了很长时间,然后跳到 100%。第四种情况是使用 worker2
和我自己的“进度条”,它的行为与您预期的一样。
这是 tqdm
4.61.1 在 Python 3.8.5 下。我已经在 Windows 和 Linux 下进行了测试。有人对此行为有解释吗?
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import sys
class MyProgressBar:
def __init__(self, n_tasks):
self._task_count = n_tasks
self._completed = 0
self.update()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(file=sys.stderr)
return False
def update(self, count=0):
self._completed += count
print(f'\r{self._completed} of {self._task_count} task(s) completed.', end='', flush=True)
def worker1(i):
if i == 0:
time.sleep(8)
else:
time.sleep(5 - i/5)
return i
def worker2(i):
if i == 0:
time.sleep(8)
else:
time.sleep(.25)
return i
if __name__ == '__main__':
args = range(20)
with ThreadPoolExecutor(max_workers=20) as pool:
futures = [pool.submit(worker1, arg) for arg in args]
for future in as_completed(futures):
print(future.result())
with ThreadPoolExecutor(max_workers=20) as pool:
with tqdm(total=len(args)) as pbar:
futures = [pool.submit(worker1, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)
with ThreadPoolExecutor(max_workers=20) as pool:
with tqdm(total=len(args)) as pbar:
futures = [pool.submit(worker2, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)
# Works with my progress "bar":
with ThreadPoolExecutor(max_workers=20) as pool:
with MyProgressBar(len(args)) as pbar:
futures = [pool.submit(worker2, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)
我想分享一个列表以附加来自 tqdm
的 process_map
并行线程的输出。 (我想使用 process_map
的原因是漂亮的进度指示器和 max_workers=
选项。)
我曾尝试使用 from multiprocessing import Manager
创建共享列表,但我在这里做错了:我的代码打印了一个空的 shared_list
,但它应该打印一个包含 20 个数字的列表,正确的顺序并不重要。
任何帮助将不胜感激,提前致谢!
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
shared_list = []
def worker(i):
global shared_list
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(worker, range(20), max_workers=5)
print(shared_list)
您没有指定您 运行 在哪个平台下(您应该在使用 multiprocessing
标记问题时使用您的平台标记您的问题),但看起来您是 运行 在使用 spawn
创建新进程的平台下(例如 Windows)。这意味着当启动一个新进程时,会创建一个空地址 space,启动一个新的 Python 解释器并从顶部重新执行源代码。
因此,尽管您在 if __name__ == '__main__':
开始的块中分配给 shared_list
托管列表,但创建的池中的每个进程都将执行 shared_list = []
破坏您的初始分配.
您可以将 shared_list
作为第一个参数传递给辅助函数:
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
from functools import partial
def worker(shared_list, i):
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(partial(worker, shared_list), range(20), max_workers=5)
print(shared_list)
If process_map
以与 ProcessPoolExecutor
相同的方式支持 initializer 和 initargs 参数 class 有(好像没有),那么你可以这样做:
import time
from tqdm.contrib.concurrent import process_map
from multiprocessing import Manager
def init_pool(the_list):
global shared_list
shared_list = the_list
def worker(i):
time.sleep(1)
shared_list.append(i)
if __name__ == '__main__':
manager = Manager()
shared_list = manager.list()
process_map(worker, range(20), max_workers=5, initializer=init_pool, initargs=(shared_list,))
print(shared_list)
评论
这与您的原始问题本身无关,但对于此类问题,您可能需要考虑使用而不是您的工作人员函数的托管列表(巧合地命名为 worker
) 追加元素并且追加元素的顺序是不确定的,因为您无法控制池进程的调度,multiprocessing.Array
实例初始化如下:
shared_list = Array('i', [0] * 20, lock=False)
然后你的 worker 函数变成:
def worker(i):
time.sleep(1)
shared_list[i] = i
此处数组存储在共享内存中,甚至不需要锁定访问,因为 worker
的每次调用都在访问数组的不同索引。访问共享内存数组的元素比访问托管列表的元素快多。唯一的问题是对共享内存的引用不能作为参数传递,我们看到 process_map
不支持 initializer 和 initargs争论。因此,您将不得不使用较低级别的方法。例如:
import time
from multiprocessing import Pool, Array
from tqdm import tqdm
def init_pool(the_list):
global shared_list
shared_list = the_list
def worker(i):
time.sleep(1)
shared_list[i] = i
if __name__ == '__main__':
# Preallocate 20 slots for the array in shared memory
# And we don't require a lock if each worker invocation is accessing a different Array index:
args = range(20)
shared_list = Array('i', [0] * len(args), lock=False)
with tqdm(total=len(args)) as pbar:
pool = Pool(5, initializer=init_pool, initargs=(shared_list,))
for result in pool.imap_unordered(worker, args):
pbar.update(1)
# print out elements one at a time:
for elem in shared_list:
print(elem)
# print out all elements at once (must first convert to a regular list):
print(list(shared_list))
评论2
我会避免使用 process_map
。该函数基于ProcessPoolExecutor.map
方法的map
方法,需要return结果对应iterable[=104=的元素的顺序] 正在通过,不是 的完成顺序。想象一下,如果由于某种原因正在处理第一个提交的任务的进程(在我们的例子中 i
值为 0)需要很长时间来处理并最终成为完成的最后一个任务,会发生什么情况。在第一个提交的任务完成之前,您会看到 tqdm
进度条在很长一段时间内什么都不做。但是当这种情况发生时,我们知道所有其他提交的任务都已经完成,因此进度条会立即从 0 跳到 100%。如下修改函数 worker
以查看实际效果:
def worker(shared_list, i):
if i == 0:
time.sleep(5)
else:
time.sleep(.25)
shared_list.append(i)
我上面提供的使用 Pool.imap_unordered
的代码版本允许 returns 结果乱序并且默认 chunksize 值为 1,它将按完成顺序排列。进度条会更流畅地进行。
评论3
tqdm
中似乎也存在错误。下面的程序演示了这次如何将低级 tqdm
调用与 concurrent.futures
模块一起使用。不幸的是,它的 ProcessPoolExecutor
class(用于多处理)和 ThreadPoolExecutor
class(用于多线程)没有等同于 imap_unordered
方法。您必须使用 submit
方法(其 multiprocessing.pool.Pool
模拟方法是 apply_async
方法),return 是一个 Future
实例,您可以在该实例上调用 result
方法来阻止完成并 return 提交任务的结果)。您将 submit
一堆任务并将 returned Future
实例存储在列表中,然后使用 as_completed
函数调用从 returned列出下一个已完成的 Future
实例。
本演示使用线程并创建一个大小为20 的线程池并提交20 个任务,因此所有任务应同时启动。 worker1
的休眠时间设置为不同,因此 i
参数的值越小,休眠时间越长。该程序创建池并提交任务 4 次。第一次,只打印 return 值。第二次使用 tqdm
进度条。结果如您所料。第三次worker2
配合tqdm
进度条使用。不同之处在于,对于 i != 0
的所有值,睡眠时间都是一个常数(.25 秒),因此对于 i
值 1、2、... 19,任务应大致在同时。因此,您会希望看到进度条在很短的时间内跳到 95%,然后等待 i == 0
任务完成。然而,您观察到的情况恰恰相反。进度条转到 5% 并在那里停留了很长时间,然后跳到 100%。第四种情况是使用 worker2
和我自己的“进度条”,它的行为与您预期的一样。
这是 tqdm
4.61.1 在 Python 3.8.5 下。我已经在 Windows 和 Linux 下进行了测试。有人对此行为有解释吗?
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import sys
class MyProgressBar:
def __init__(self, n_tasks):
self._task_count = n_tasks
self._completed = 0
self.update()
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
print(file=sys.stderr)
return False
def update(self, count=0):
self._completed += count
print(f'\r{self._completed} of {self._task_count} task(s) completed.', end='', flush=True)
def worker1(i):
if i == 0:
time.sleep(8)
else:
time.sleep(5 - i/5)
return i
def worker2(i):
if i == 0:
time.sleep(8)
else:
time.sleep(.25)
return i
if __name__ == '__main__':
args = range(20)
with ThreadPoolExecutor(max_workers=20) as pool:
futures = [pool.submit(worker1, arg) for arg in args]
for future in as_completed(futures):
print(future.result())
with ThreadPoolExecutor(max_workers=20) as pool:
with tqdm(total=len(args)) as pbar:
futures = [pool.submit(worker1, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)
with ThreadPoolExecutor(max_workers=20) as pool:
with tqdm(total=len(args)) as pbar:
futures = [pool.submit(worker2, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)
# Works with my progress "bar":
with ThreadPoolExecutor(max_workers=20) as pool:
with MyProgressBar(len(args)) as pbar:
futures = [pool.submit(worker2, arg) for arg in args]
for future in as_completed(futures):
future.result()
pbar.update(1)