并行打印和写锁

Parallel printing and write lock

我有一个简单的项目,我需要并行打印进度信息,例如进度条。

每个栏都有一个位置,终端中的书写光标会根据栏的位置上下移动。

这在串行打印时效果很好,但由于竞速问题而在并行打印时失败。我尝试使用 multiprocessing.Lock() 但无济于事。

这是我当前的代码:

from __future__ import division

import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock

if os.name == 'nt':
    import colorama  # to support cursor up
    colorama.init()

_term_move_up = '\x1b[A'

write_lock = Lock()

class simple_bar(object):
    def __init__(self, iterable, desc='', position=0):
        signal.signal(signal.SIGINT, signal.SIG_IGN)  # handles keyboardinterrupt
        self.iterable = iterable
        self.total = len(iterable)
        self.n = 0
        self.position = position
        self.desc = desc
        self.display()

    def __iter__(self):
        for obj in self.iterable:
            yield obj
            self.update()

    def update(self, n=1):
        self.n += n
        self.display()

    def display(self, fp=None, width=79):
        if not fp:
            fp = sys.stdout

        with write_lock:
            fp.write('\n' * self.position)
            l_part = self.desc + ': '
            bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
            fp.write('\r' + bar + ' ' * (width - len(bar)))
            fp.write(_term_move_up * self.position)
            fp.flush()

def progresser(n):         
    text = "progresser #{}".format(n)
    for i in simple_bar(range(5000), desc=text, position=n):
        sleep(0.001)

if __name__ == '__main__':
    freeze_support()
    L = list(range(3))
    Pool(len(L)).map(progresser, L)

工作正常的串行替代方案,这给出了应该由上面的并行版本产生的正确输出:

# Same code as above, except __main__

if __name__ == '__main__':
    t_list = [simple_bar(range(5000), desc="progresser #{}".format(n), position=n) for n in xrange(3)]
    for i in range(5000):
        for t in t_list:
            t.update()

我不知道出了什么问题。我在 Windows 7.

上使用 Python 2.7.12

我正在寻找一种在多处理中安全地并行打印的方法,理想情况下但可选地线程安全

/编辑:有趣的是,如果我在打印前等待(但足够大),那么条形图就可以打印出来:

# ...
    def display(self, fp=None, width=79):
        if not fp:
            fp = sys.stdout

        with write_lock:
            sleep(1)  # this fixes the issue by adding a delay
            fp.write('\n' * self.position)
            l_part = self.desc + ': '
            bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
            fp.write('\r' + bar + ' ' * (width - len(bar)))
            fp.write(_term_move_up * self.position)
            fp.flush()
# ...

我不知道这意味着什么结论。

您需要在write_lock.release()之前添加fp.flush()

不相关的评论:

  • 考虑使用锁作为上下文管理器(with write_lock... 而不是手动 acquire()release())——这更容易遵循并且不易出错。
  • 这两个版本都不能很好地处理中断 (Ctrl+C),您可能需要研究一下。

这可能是全局锁变量的问题。当您在 unix 中创建子进程时,您拥有父进程内存的副本。 windows 好像不是这样

试试这个代码

from __future__ import division
import os, sys
import signal
from time import sleep
from multiprocessing import Pool, freeze_support, Lock

if os.name == 'nt':
    import colorama  # to support cursor up
    colorama.init()

_term_move_up = '\x1b[A'



class simple_bar(object):
    def __init__(self, iterable, desc='', position=0):
        signal.signal(signal.SIGINT, signal.SIG_IGN)  # handles keyboardinterrupt
        self.iterable = iterable
        self.total = len(iterable)
        self.n = 0
        self.position = position
        self.desc = desc
        self.display()

    def __iter__(self):
        for obj in self.iterable:
            yield obj
            self.update()

    def update(self, n=1):
        self.n += n
        self.display()

    def display(self, fp=None, width=79):
        if not fp:
            fp = sys.stdout

        with write_lock:
            fp.write('\n' * self.position)
            l_part = self.desc + ': '
            bar = l_part + '#' * int((self.n / self.total) * (width - len(l_part)))
            fp.write('\r' + bar + ' ' * (width - len(bar)))
            fp.write(_term_move_up * self.position)
            fp.flush()

def progresser(n):
    text = "progresser #{}".format(n)
    for i in simple_bar(range(5000), desc=text, position=n):
        sleep(0.001)

def init_child(lock_):
    global write_lock
    write_lock = lock_

if __name__ == '__main__':
    write_lock = Lock()
    L = list(range(3))
    pool = Pool(len(L), initializer=init_child, initargs=(write_lock,))
    pool.map(progresser, L)