无法将文件句柄和锁定传递给具有 multiprocessing.Pool 的进程?
Can't pass file handle and lock to a process with multiprocessing.Pool?
我正在使用 multiprocessing.Pool()
启动一堆进程,其中每个进程写入同一个文件(使用锁)。
每个进程都被分配了一个“任务”,它只是一个参数元组。
其中一个参数是文件句柄,另一个参数是锁。
但是Python不喜欢我既不传递文件句柄也不传递锁。
(我 可以 做到 而不是 使用 multiprocessing.Pool
,只调用 multiprocessing.Process
。)
示例。
import multiprocessing as mp
import time
import random
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
arg2 = args[2]
arg3 = args[3]
arg4 = args[4]
s = random.random()/10
time.sleep(s)
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
o_file = open('test.txt','w')
o_lock = mp.Lock()
tasks = [
[0, 0,1, o_file,o_lock],
[1, 2,3, o_file,o_lock],
[2, 4,5, o_file,o_lock],
[3, 6,7, o_file,o_lock],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
传递文件时我得到:TypeError: cannot serialize '_io.TextIOWrapper' object
。
通过锁时我得到:RuntimeError: Lock objects should only be shared between processes through inheritance
。
我该如何解决这个问题?
编辑。
所以我想知道这是否可以(它似乎有效)。我唯一关心的是每个 write
本身都是原子的,但写入的顺序无关紧要。
import multiprocessing as mp
import time
import random
import os
# ----------------------------------------------------------------
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
s = random.random()/10
time.sleep(s)
txt = 1004*str(arg0)
with open('test.txt','a') as o_file:
o_file.write(f'{txt}\n')
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
# ----------------------------------------------------------------
os.remove('test.txt')
tasks = [
[0, 0xf0],
[1, 0xf1],
[2, 0xf2],
[3, 0xf3],
[4, 0xf4],
[5, 0xf5],
[6, 0xf6],
[7, 0xf7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
对于锁和打开的文件描述符,您应该通过进程继承共享它们,而不是试图将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:
import multiprocessing as mp
import time
import random
def thr_work00(args):
global o_lock, o_file
s = random.randint(0, 5)
with o_lock:
time.sleep(s)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
o_file.write(f"{args[0]} {s}\n")
o_file.flush()
return args
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
或者,不要写入您的工作程序中的文件,只需
在收集结果时在主线程中执行写入。这个
不再需要锁,因为您不再需要担心
关于多个进程写入同一个文件...
...或者,如果您需要写入“实时”发生,而不是在最后,使用 Queue
.
将它们传送到专用的写入器线程
这是使用队列将结果传递给专用对象的一个示例
作家:
import multiprocessing as mp
import time
import random
resultq = mp.Queue()
def thr_work00(args):
global resultq
s = random.randint(0, 5)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
time.sleep(s)
resultq.put((args[0], s))
return args
def thr_writer():
global resultq
print('writer start')
with open('test.txt', 'w') as fd:
while True:
item = resultq.get()
if item is None:
break
fd.write(f'{item[0]}: {item[1]}\n')
print('writer exit')
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
writer = mp.Process(target=thr_writer)
writer.start()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
resultq.put(None)
writer.join()
我正在使用 multiprocessing.Pool()
启动一堆进程,其中每个进程写入同一个文件(使用锁)。
每个进程都被分配了一个“任务”,它只是一个参数元组。
其中一个参数是文件句柄,另一个参数是锁。
但是Python不喜欢我既不传递文件句柄也不传递锁。
(我 可以 做到 而不是 使用 multiprocessing.Pool
,只调用 multiprocessing.Process
。)
示例。
import multiprocessing as mp
import time
import random
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
arg2 = args[2]
arg3 = args[3]
arg4 = args[4]
s = random.random()/10
time.sleep(s)
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
o_file = open('test.txt','w')
o_lock = mp.Lock()
tasks = [
[0, 0,1, o_file,o_lock],
[1, 2,3, o_file,o_lock],
[2, 4,5, o_file,o_lock],
[3, 6,7, o_file,o_lock],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
传递文件时我得到:TypeError: cannot serialize '_io.TextIOWrapper' object
。
通过锁时我得到:RuntimeError: Lock objects should only be shared between processes through inheritance
。
我该如何解决这个问题?
编辑。
所以我想知道这是否可以(它似乎有效)。我唯一关心的是每个 write
本身都是原子的,但写入的顺序无关紧要。
import multiprocessing as mp
import time
import random
import os
# ----------------------------------------------------------------
def thr_work00(args):
arg0 = args[0]
arg1 = args[1]
s = random.random()/10
time.sleep(s)
txt = 1004*str(arg0)
with open('test.txt','a') as o_file:
o_file.write(f'{txt}\n')
print(f'\x1b[92m{arg0} \x1b[32m{s:.3f}\x1b[0m')
return args
# ----------------------------------------------------------------
os.remove('test.txt')
tasks = [
[0, 0xf0],
[1, 0xf1],
[2, 0xf2],
[3, 0xf3],
[4, 0xf4],
[5, 0xf5],
[6, 0xf6],
[7, 0xf7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
对于锁和打开的文件描述符,您应该通过进程继承共享它们,而不是试图将它们作为参数传递。子进程从其父进程继承所有打开的文件描述符,因此您可以这样编写代码:
import multiprocessing as mp
import time
import random
def thr_work00(args):
global o_lock, o_file
s = random.randint(0, 5)
with o_lock:
time.sleep(s)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
o_file.write(f"{args[0]} {s}\n")
o_file.flush()
return args
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
或者,不要写入您的工作程序中的文件,只需 在收集结果时在主线程中执行写入。这个 不再需要锁,因为您不再需要担心 关于多个进程写入同一个文件...
...或者,如果您需要写入“实时”发生,而不是在最后,使用 Queue
.
这是使用队列将结果传递给专用对象的一个示例 作家:
import multiprocessing as mp
import time
import random
resultq = mp.Queue()
def thr_work00(args):
global resultq
s = random.randint(0, 5)
print(f"\x1b[92m{args[0]} \x1b[32m{s}\x1b[0m")
time.sleep(s)
resultq.put((args[0], s))
return args
def thr_writer():
global resultq
print('writer start')
with open('test.txt', 'w') as fd:
while True:
item = resultq.get()
if item is None:
break
fd.write(f'{item[0]}: {item[1]}\n')
print('writer exit')
with open("test.txt", "w") as o_file:
o_lock = mp.Lock()
writer = mp.Process(target=thr_writer)
writer.start()
tasks = [
[0, 0, 1],
[1, 2, 3],
[2, 4, 5],
[3, 6, 7],
]
with mp.Pool(2) as pool:
results = pool.map(thr_work00, tasks)
for res in results:
print(res)
resultq.put(None)
writer.join()