星图与 tqdm 结合?
Starmap combined with tqdm?
我在做一些并行处理,如下:
with mp.Pool(8) as tmpPool:
results = tmpPool.starmap(my_function, inputs)
其中输入如下所示:
[(1,0.2312),(5,0.52) ...]
即 int 和 float 的元组。
代码运行良好,但我似乎无法将其包裹在加载条 (tqdm) 周围,例如可以使用 imap 方法完成,如下所示:
tqdm.tqdm(mp.imap(some_function,some_inputs))
星图也可以这样做吗?
谢谢!
临时解决方案:改写imap并行化方法
starmap()
不可能,但添加 Pool.istarmap()
的补丁是可能的。它基于 imap()
的代码。您所要做的就是创建 istarmap.py
文件并导入模块以应用补丁,然后再进行常规的多处理导入。
Python <3.8
# istarmap.py for Python <3.8
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
if self._state != mpp.RUN:
raise ValueError("Pool not running")
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self._cache)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap
Python 3.8+
# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
self._check_running()
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap
然后在你的脚本中:
import istarmap # import to apply patch
from multiprocessing import Pool
import tqdm
def foo(a, b):
for _ in range(int(50e6)):
pass
return a, b
if __name__ == '__main__':
with Pool(4) as pool:
iterable = [(i, 'x') for i in range(10)]
for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
total=len(iterable)):
pass
最简单的方法可能是在输入周围应用 tqdm(),而不是映射函数。例如:
inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))
正如 Darkonaut 所提到的,在撰写本文时,没有 istarmap
本机可用。如果你想避免打补丁,你可以添加一个简单的 *_star
函数作为解决方法。 (此解决方案的灵感来自 this tutorial.)
import tqdm
import multiprocessing
def my_function(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def my_function_star(args):
return my_function(*args)
jobs = 4
with multiprocessing.Pool(jobs) as pool:
args = [(i, i, i) for i in range(10000)]
results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))
一些注意事项:
我也很喜欢科里的回答。它更干净,尽管进度条似乎没有我的回答更新得那么顺利。请注意,使用我上面使用 chunksize=1
(默认)发布的代码,科里的答案要快几个数量级。我猜这是由于多处理序列化,因为增加 chunksize
(或具有更昂贵的 my_function
)使它们的运行时间具有可比性。
由于我的 serialization/function 成本比率非常低,所以我选择了我的申请答案。
我在做一些并行处理,如下:
with mp.Pool(8) as tmpPool:
results = tmpPool.starmap(my_function, inputs)
其中输入如下所示: [(1,0.2312),(5,0.52) ...] 即 int 和 float 的元组。
代码运行良好,但我似乎无法将其包裹在加载条 (tqdm) 周围,例如可以使用 imap 方法完成,如下所示:
tqdm.tqdm(mp.imap(some_function,some_inputs))
星图也可以这样做吗?
谢谢!
临时解决方案:改写imap并行化方法
starmap()
不可能,但添加 Pool.istarmap()
的补丁是可能的。它基于 imap()
的代码。您所要做的就是创建 istarmap.py
文件并导入模块以应用补丁,然后再进行常规的多处理导入。
Python <3.8
# istarmap.py for Python <3.8
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
if self._state != mpp.RUN:
raise ValueError("Pool not running")
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self._cache)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap
Python 3.8+
# istarmap.py for Python 3.8+
import multiprocessing.pool as mpp
def istarmap(self, func, iterable, chunksize=1):
"""starmap-version of imap
"""
self._check_running()
if chunksize < 1:
raise ValueError(
"Chunksize must be 1+, not {0:n}".format(
chunksize))
task_batches = mpp.Pool._get_tasks(func, iterable, chunksize)
result = mpp.IMapIterator(self)
self._taskqueue.put(
(
self._guarded_task_generation(result._job,
mpp.starmapstar,
task_batches),
result._set_length
))
return (item for chunk in result for item in chunk)
mpp.Pool.istarmap = istarmap
然后在你的脚本中:
import istarmap # import to apply patch
from multiprocessing import Pool
import tqdm
def foo(a, b):
for _ in range(int(50e6)):
pass
return a, b
if __name__ == '__main__':
with Pool(4) as pool:
iterable = [(i, 'x') for i in range(10)]
for _ in tqdm.tqdm(pool.istarmap(foo, iterable),
total=len(iterable)):
pass
最简单的方法可能是在输入周围应用 tqdm(),而不是映射函数。例如:
inputs = zip(param1, param2, param3)
with mp.Pool(8) as pool:
results = pool.starmap(my_function, tqdm.tqdm(inputs, total=len(param1)))
正如 Darkonaut 所提到的,在撰写本文时,没有 istarmap
本机可用。如果你想避免打补丁,你可以添加一个简单的 *_star
函数作为解决方法。 (此解决方案的灵感来自 this tutorial.)
import tqdm
import multiprocessing
def my_function(arg1, arg2, arg3):
return arg1 + arg2 + arg3
def my_function_star(args):
return my_function(*args)
jobs = 4
with multiprocessing.Pool(jobs) as pool:
args = [(i, i, i) for i in range(10000)]
results = list(tqdm.tqdm(pool.imap(my_function_star, args), total=len(args))
一些注意事项:
我也很喜欢科里的回答。它更干净,尽管进度条似乎没有我的回答更新得那么顺利。请注意,使用我上面使用 chunksize=1
(默认)发布的代码,科里的答案要快几个数量级。我猜这是由于多处理序列化,因为增加 chunksize
(或具有更昂贵的 my_function
)使它们的运行时间具有可比性。
由于我的 serialization/function 成本比率非常低,所以我选择了我的申请答案。