并行打印和写锁
Parallel printing and write lock
我有一个简单的项目,我需要并行打印进度信息,例如进度条。
每个栏都有一个位置,终端中的书写光标会根据栏的位置上下移动。
这在串行打印时效果很好,但由于竞速问题而在并行打印时失败。我尝试使用 multiprocessing.Lock()
但无济于事。
这是我当前的代码:
from __future__ import division
import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock
if os.name == 'nt':
import colorama # to support cursor up
colorama.init()
_term_move_up = '\x1b[A'
write_lock = Lock()
class simple_bar(object):
def __init__(self, iterable, desc='', position=0):
signal.signal(signal.SIGINT, signal.SIG_IGN) # handles keyboardinterrupt
self.iterable = iterable
self.total = len(iterable)
self.n = 0
self.position = position
self.desc = desc
self.display()
def __iter__(self):
for obj in self.iterable:
yield obj
self.update()
def update(self, n=1):
self.n += n
self.display()
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
def progresser(n):
text = "progresser #{}".format(n)
for i in simple_bar(range(5000), desc=text, position=n):
sleep(0.001)
if __name__ == '__main__':
freeze_support()
L = list(range(3))
Pool(len(L)).map(progresser, L)
工作正常的串行替代方案,这给出了应该由上面的并行版本产生的正确输出:
# Same code as above, except __main__
if __name__ == '__main__':
t_list = [simple_bar(range(5000), desc="progresser #{}".format(n), position=n) for n in xrange(3)]
for i in range(5000):
for t in t_list:
t.update()
我不知道出了什么问题。我在 Windows 7.
上使用 Python 2.7.12
我正在寻找一种在多处理中安全地并行打印的方法,理想情况下但可选地线程安全。
/编辑:有趣的是,如果我在打印前等待(但足够大),那么条形图就可以打印出来:
# ...
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
sleep(1) # this fixes the issue by adding a delay
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
# ...
我不知道这意味着什么结论。
您需要在write_lock.release()
之前添加fp.flush()
。
不相关的评论:
- 考虑使用锁作为上下文管理器(
with write_lock...
而不是手动 acquire()
和 release()
)——这更容易遵循并且不易出错。
- 这两个版本都不能很好地处理中断 (Ctrl+C),您可能需要研究一下。
这可能是全局锁变量的问题。当您在 unix 中创建子进程时,您拥有父进程内存的副本。 windows 好像不是这样
试试这个代码
from __future__ import division
import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock
if os.name == 'nt':
import colorama # to support cursor up
colorama.init()
_term_move_up = '\x1b[A'
class simple_bar(object):
def __init__(self, iterable, desc='', position=0):
signal.signal(signal.SIGINT, signal.SIG_IGN) # handles keyboardinterrupt
self.iterable = iterable
self.total = len(iterable)
self.n = 0
self.position = position
self.desc = desc
self.display()
def __iter__(self):
for obj in self.iterable:
yield obj
self.update()
def update(self, n=1):
self.n += n
self.display()
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
def progresser(n):
text = "progresser #{}".format(n)
for i in simple_bar(range(5000), desc=text, position=n):
sleep(0.001)
def init_child(lock_):
global write_lock
write_lock = lock_
if __name__ == '__main__':
write_lock = Lock()
L = list(range(3))
pool = Pool(len(L), initializer=init_child, initargs=(write_lock,))
pool.map(progresser, L)
我有一个简单的项目,我需要并行打印进度信息,例如进度条。
每个栏都有一个位置,终端中的书写光标会根据栏的位置上下移动。
这在串行打印时效果很好,但由于竞速问题而在并行打印时失败。我尝试使用 multiprocessing.Lock()
但无济于事。
这是我当前的代码:
from __future__ import division
import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock
if os.name == 'nt':
import colorama # to support cursor up
colorama.init()
_term_move_up = '\x1b[A'
write_lock = Lock()
class simple_bar(object):
def __init__(self, iterable, desc='', position=0):
signal.signal(signal.SIGINT, signal.SIG_IGN) # handles keyboardinterrupt
self.iterable = iterable
self.total = len(iterable)
self.n = 0
self.position = position
self.desc = desc
self.display()
def __iter__(self):
for obj in self.iterable:
yield obj
self.update()
def update(self, n=1):
self.n += n
self.display()
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
def progresser(n):
text = "progresser #{}".format(n)
for i in simple_bar(range(5000), desc=text, position=n):
sleep(0.001)
if __name__ == '__main__':
freeze_support()
L = list(range(3))
Pool(len(L)).map(progresser, L)
工作正常的串行替代方案,这给出了应该由上面的并行版本产生的正确输出:
# Same code as above, except __main__
if __name__ == '__main__':
t_list = [simple_bar(range(5000), desc="progresser #{}".format(n), position=n) for n in xrange(3)]
for i in range(5000):
for t in t_list:
t.update()
我不知道出了什么问题。我在 Windows 7.
上使用 Python 2.7.12我正在寻找一种在多处理中安全地并行打印的方法,理想情况下但可选地线程安全。
/编辑:有趣的是,如果我在打印前等待(但足够大),那么条形图就可以打印出来:
# ...
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
sleep(1) # this fixes the issue by adding a delay
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
# ...
我不知道这意味着什么结论。
您需要在write_lock.release()
之前添加fp.flush()
。
不相关的评论:
- 考虑使用锁作为上下文管理器(
with write_lock...
而不是手动acquire()
和release()
)——这更容易遵循并且不易出错。 - 这两个版本都不能很好地处理中断 (Ctrl+C),您可能需要研究一下。
这可能是全局锁变量的问题。当您在 unix 中创建子进程时,您拥有父进程内存的副本。 windows 好像不是这样
试试这个代码
from __future__ import division
import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock
if os.name == 'nt':
import colorama # to support cursor up
colorama.init()
_term_move_up = '\x1b[A'
class simple_bar(object):
def __init__(self, iterable, desc='', position=0):
signal.signal(signal.SIGINT, signal.SIG_IGN) # handles keyboardinterrupt
self.iterable = iterable
self.total = len(iterable)
self.n = 0
self.position = position
self.desc = desc
self.display()
def __iter__(self):
for obj in self.iterable:
yield obj
self.update()
def update(self, n=1):
self.n += n
self.display()
def display(self, fp=None, width=79):
if not fp:
fp = sys.stdout
with write_lock:
fp.write('\n' * self.position)
l_part = self.desc + ': '
bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
fp.write('\r' + bar + ' ' * (width - len(bar)))
fp.write(_term_move_up * self.position)
fp.flush()
def progresser(n):
text = "progresser #{}".format(n)
for i in simple_bar(range(5000), desc=text, position=n):
sleep(0.001)
def init_child(lock_):
global write_lock
write_lock = lock_
if __name__ == '__main__':
write_lock = Lock()
L = list(range(3))
pool = Pool(len(L), initializer=init_child, initargs=(write_lock,))
pool.map(progresser, L)