无法将文件句柄和锁定传递给具有 multiprocessing.Pool 的进程?

Can't pass file handle and lock to a process with multiprocessing.Pool?

我正在使用 multiprocessing.Pool() 启动一堆进程,其中每个进程写入同一个文件(使用锁)。

每个进程都被分配了一个“任务”,它只是一个参数元组。

其中一个参数是文件句柄,另一个参数是锁。

但是Python不喜欢我既不传递文件句柄也不传递锁。

(我 可以 做到 而不是 使用 multiprocessing.Pool,只调用 multiprocessing.Process。)

示例。

import multiprocessing as mp
import time
import random

def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  arg2 = args[2]
  arg3 = args[3]
  arg4 = args[4]
  s = random.random()/10
  time.sleep(s)
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

o_file = open('test.txt','w')
o_lock = mp.Lock()

tasks = [
  [0, 0,1, o_file,o_lock],
  [1, 2,3, o_file,o_lock],
  [2, 4,5, o_file,o_lock],
  [3, 6,7, o_file,o_lock],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

传递文件时我得到:TypeError: cannot serialize '_io.TextIOWrapper' object

通过锁时我得到:RuntimeError: Lock objects should only be shared between processes through inheritance

我该如何解决这个问题?


编辑。

所以我想知道这是否可以(它似乎有效)。我唯一关心的是每个 write 本身都是原子的,但写入的顺序无关紧要。

import multiprocessing as mp
import time
import random
import os

# ----------------------------------------------------------------
def thr_work00(args):
  arg0 = args[0]
  arg1 = args[1]
  s = random.random()/10
  time.sleep(s)
  txt = 1004*str(arg0)
  with open('test.txt','a') as o_file:
    o_file.write(f'{txt}\n')
  print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
  return args

# ----------------------------------------------------------------
os.remove('test.txt')

tasks = [
  [0, 0xf0],
  [1, 0xf1],
  [2, 0xf2],
  [3, 0xf3],
  [4, 0xf4],
  [5, 0xf5],
  [6, 0xf6],
  [7, 0xf7],
]

with mp.Pool(2) as pool:
  results = pool.map(thr_work00, tasks)
  for res in results:
    print(res)

对于锁和打开的文件描述符,您应该通过进程继承共享它们,而不是试图将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:

import multiprocessing as mp
import time
import random


def thr_work00(args):
    global o_lock, o_file

    s = random.randint(0, 5)
    with o_lock:
        time.sleep(s)
        print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
        o_file.write(f"{args[0]} {s}\n")
        o_file.flush()
    return args


with open("test.txt", "w") as o_file:
    o_lock = mp.Lock()

    tasks = [
        [0, 0, 1],
        [1, 2, 3],
        [2, 4, 5],
        [3, 6, 7],
    ]

    with mp.Pool(2) as pool:
        results = pool.map(thr_work00, tasks)
        for res in results:
            print(res)

或者,不要写入您的工作程序中的文件,只需 在收集结果时在主线程中执行写入。这个 不再需要锁,因为您不再需要担心 关于多个进程写入同一个文件...

...或者,如果您需要写入“实时”发生,而不是在最后,使用 Queue.

将它们传送到专用的写入器线程

这是使用队列将结果传递给专用对象的一个​​示例 作家:

import multiprocessing as mp
import time
import random

resultq = mp.Queue()


def thr_work00(args):
    global resultq
    s = random.randint(0, 5)
    print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
    time.sleep(s)
    resultq.put((args[0], s))
    return args


def thr_writer():
    global resultq
    print('writer start')
    with open('test.txt', 'w') as fd:
        while True:
            item = resultq.get()
            if item is None:
                break
            fd.write(f'{item[0]}: {item[1]}\n')
    print('writer exit')


with open("test.txt", "w") as o_file:
    o_lock = mp.Lock()

    writer = mp.Process(target=thr_writer)
    writer.start()

    tasks = [
        [0, 0, 1],
        [1, 2, 3],
        [2, 4, 5],
        [3, 6, 7],
    ]

    with mp.Pool(2) as pool:
        results = pool.map(thr_work00, tasks)
        for res in results:
            print(res)

    resultq.put(None)
    writer.join()