多处理中的有序状态打印

Ordered status printing in multiprocessing

我有一个程序,我必须在我的程序执行的每个成功步骤后打印给用户,我已经尝试使用 Lock 但它确实减慢了我的程序。

基本上我想做的是向用户(有序)打印每个成功的操作,就像我有一些代码对某些页面执行 post 并且我打印 X 以有序方式完成的操作

显示我正在尝试做的事情的示例:(它似乎有效)但它确实减慢了任务速度:

lock = Lock()

def run(u):
    lock.acquire()
    buff = []
    e = requests.post('#personal_url', data=mydata)
    buff.append(e.text)
    buff.append('----------------')
    f = requests.get('#personal_urlx')
    buff.append(u + ' --> ' f.text)
    print('\n'.join(buff))
    lock.release()

p = Pool(4)
p.map(run, uu)
p.close()
p.join()

非常感谢任何帮助,谢谢。

已更新

阅读您的评论后,我对代码进行了一些更改。可能不是一个很好的方法,但基本上,我分叉了另一个进程,它将在某个时间间隔轮询共享字典并更新控制台输出。请注意,这将在更新时清除整个控制台。希望这是预期的行为。

代码:

from multiprocessing import Lock, Process, Pool, Manager
import time

def run(user,logs):
    logs[user] += ['Message 1 for user: ' + user]
    time.sleep(2) #some work done per user
    logs[user] += ['Message 2 for user: ' + user]
    return 1

manager = Manager()
logs = manager.dict()

users = ['Tom', 'Bob', 'Dinesh', 'Ravi']

for user in users:
    logs[user] = [] #initialize empty list for each user

logs_list = [logs for i in range(len(users))] 

def poll(logs):
    while True:
        print("3c") #clear the console
        for user in logs.keys():
            print('Logs for user:', user)
            print('\n'.join(logs[user]))
            print('----------------')
        time.sleep(0.1)

poller_process = Process(target=poll, args=(logs,))

poller_process.start()
p = Pool(4)
p.starmap(run, zip(users,logs_list))
p.close()
p.join()
poller_process.join()

------
Output #logs under each user are refreshed constantly
------
Logs for user: Tom
Message 1 for user: Tom
Message 2 for user: Tom
----------------
Logs for user: Bob
Message 1 for user: Bob
Message 2 for user: Bob
----------------
Logs for user: Dinesh
Message 1 for user: Dinesh
Message 2 for user: Dinesh
----------------
Logs for user: Ravi
Message 1 for user: Ravi
Message 2 for user: Ravi
----------------

这可能不是一个非常优雅的方法,但它确实有效。您可以尝试根据共享字典中的 'user' 键聚合每个进程的结果。然后,您可以在 pool.join() 之后遍历字典并按顺序打印所有结果。这将消除对锁的需要。

代码看起来像这样:

from multiprocessing import Lock, Process, Pool, Manager
import time

def run(user,logs):
    logs[user] += ['Message 1 for user: ' + user]
    time.sleep(1) #some work done per user
    logs[user] += ['Message 2 for user: ' + user]
    return 1

manager = Manager()
logs = manager.dict()

users = ['Tom', 'Bob', 'Dinesh', 'Ravi']

for user in users:
    logs[user] = [] #initialize empty list for each user

logs_list = [logs for i in range(len(users))] 

p = Pool(4)
p.starmap(run, zip(users,logs_list))
p.close()
p.join()

for user in logs.keys():
    print(logs[user])


------
Output:  
------
['Message 1 for user: Tom', 'Message 2 for user: Tom']
['Message 1 for user: Bob', 'Message 2 for user: Bob']
['Message 1 for user: Dinesh', 'Message 2 for user: Dinesh']
['Message 1 for user: Ravi', 'Message 2 for user: Ravi']

可能减慢您的程序的是您的锁定策略。锁应该只用于保护所谓的关键代码部分,这些代码包含共享资源,如果保护不当,这些共享资源可能包含无效状态。

所以我的建议是,如果您唯一关心的是在您的 stdout 上有有效的输出(意味着您的打印不会被打断并且打印出整行),请尝试通过编写一种扩展打印函数来保护您的 stdout,并且只在那里使用你的锁。像这样:

def ext_print(str, lock):
    lock.acquire()
    print(str)
    lock.release()

从您当前的代码中,请删除对锁的操作并仅在 ext_print 函数内使用锁定。

def run(u):
    buff = []
    e = requests.post('#personal_url', data=mydata)
    buff.append(e.text)
    buff.append('----------------')
    f = requests.get('#personal_urlx')
    buff.append(u + ' --> ' f.text)
    ext_print('\n'.join(buff), lock)

使用这种方法,您应该在 stdout 上获得干净的输出。请注意,使用这种方法,输出可能会延迟写入标准输出,这意味着有两个线程 t1t2,您可能会得到 t2 的输出尽管 t1t2 之前完成了数据处理,但在 t1 的输出之前开始晚于 t1。因此,这种方法将提高性能和通过多线程实现的性能增益,但它不能保证输出反映完成的 getset 操作的顺序完全相同。

我认为真正按照与每个已完成操作相同的顺序编写输出的唯一方法是采用如下解决方案:

def run(u):
    buff = []

    lock.acquire()
    e = requests.post('#personal_url', data=mydata)
    print(e.text)
    print('----------------')
    lock.release()

    lock.acquire()
    f = requests.get('#personal_urlx')
    print(u + ' --> ' f.text)
    print('----------------')
    lock.release()

如您所料,这台机器的性能可能会更差。