Python,跨并行子任务使用 tqdm 进行跟踪
Python, tracking using tqdm across parallel sub-tasks
为了控制我正在处理的代码,我尝试创建一个跨不同线程中发生的许多任务的跟踪。
我在 运行 开头知道任务(和工人)的数量。
用于演示(不起作用,玩具示例):
from multiprocessing import Pool
from tqdm import tqdm
def work(i, t):
for _ in range(10**6):
t.update()
return i
def wrapped_work(params):
work(*params)
def main(n=1):
# another loop:
with Pool(processes=8) as p:
with tqdm(total=n * 10**6) as t:
return sum(p.map(work, ((i, t) for i in range(1, n+1))))
if __name__ == "__main__":
main(5)
我试图用池暗示 ,但没有成功。
非常感谢您的帮助。
基于this post:
from multiprocessing import Pool, Process, Value
from ctypes import c_bool, c_long
from tqdm.auto import tqdm
class TqdmMultiprocessing:
max_processes = 64
def __init__(self, static_func, processes=64):
self.counter = Value(c_long, lock=False)
self.pool = Pool(
processes=min(processes, self.max_processes),
initializer=self.worker_init,
initargs=(static_func, self.counter)
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
def tqdm(self, static_func, iterable, **kwargs):
done_value = Value(c_bool)
proc = Process(target=self.listener, args=(self.counter, done_value, kwargs,))
proc.start()
result = self.pool.map(static_func, iterable)
done_value.value = True
proc.join()
self.counter.value = 0
return result
@staticmethod
def listener(counter: Value, is_done: Value, kwargs):
with tqdm(**kwargs) as tqdm_bar:
old_counter = 0
while not is_done.value:
new_counter = counter.value
tqdm_bar.update(new_counter - old_counter)
old_counter = new_counter
tqdm_bar.update(tqdm_bar.total - old_counter)
@staticmethod
def worker_init(static_func, counter: Value):
static_func.counter = counter
def work(i):
for _ in range(10**6):
work.counter.value += 1
return i
def main(n=1):
with TqdmMultiprocessing(work, processes=3) as p:
p.tqdm(work, range(n), total=n * 10 ** 6)
p.tqdm(work, range(n), total=n * 10 ** 6)
if __name__ == "__main__":
main(5)
为了控制我正在处理的代码,我尝试创建一个跨不同线程中发生的许多任务的跟踪。 我在 运行 开头知道任务(和工人)的数量。
用于演示(不起作用,玩具示例):
from multiprocessing import Pool
from tqdm import tqdm
def work(i, t):
for _ in range(10**6):
t.update()
return i
def wrapped_work(params):
work(*params)
def main(n=1):
# another loop:
with Pool(processes=8) as p:
with tqdm(total=n * 10**6) as t:
return sum(p.map(work, ((i, t) for i in range(1, n+1))))
if __name__ == "__main__":
main(5)
我试图用池暗示
基于this post:
from multiprocessing import Pool, Process, Value
from ctypes import c_bool, c_long
from tqdm.auto import tqdm
class TqdmMultiprocessing:
max_processes = 64
def __init__(self, static_func, processes=64):
self.counter = Value(c_long, lock=False)
self.pool = Pool(
processes=min(processes, self.max_processes),
initializer=self.worker_init,
initargs=(static_func, self.counter)
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.pool.close()
def tqdm(self, static_func, iterable, **kwargs):
done_value = Value(c_bool)
proc = Process(target=self.listener, args=(self.counter, done_value, kwargs,))
proc.start()
result = self.pool.map(static_func, iterable)
done_value.value = True
proc.join()
self.counter.value = 0
return result
@staticmethod
def listener(counter: Value, is_done: Value, kwargs):
with tqdm(**kwargs) as tqdm_bar:
old_counter = 0
while not is_done.value:
new_counter = counter.value
tqdm_bar.update(new_counter - old_counter)
old_counter = new_counter
tqdm_bar.update(tqdm_bar.total - old_counter)
@staticmethod
def worker_init(static_func, counter: Value):
static_func.counter = counter
def work(i):
for _ in range(10**6):
work.counter.value += 1
return i
def main(n=1):
with TqdmMultiprocessing(work, processes=3) as p:
p.tqdm(work, range(n), total=n * 10 ** 6)
p.tqdm(work, range(n), total=n * 10 ** 6)
if __name__ == "__main__":
main(5)