使用顺序初始化参数的多进程池初始化
Multiprocess pool initialization with sequential initializer argument
我有如下代码:
import multiprocessing as mp
connection: module.Connection
def client_id():
for i in range(mp.cpu_count*2):
yield i
def initproc(host: str, port: int, client_id: int):
global connection
connection.connect(host, port, client_id)
def main():
host = "something"
port = 12345
mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
initializer=initproc,
initargs=(host, port, client_id())) as p:
res = p.starmap(processing_function, arg_list)
就问题而言 processing_function 和 arg_list 不相关。
问题是我收到一个错误:
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object
有没有什么方法可以在池中创建一个初始化进程,使初始化它的参数之一成为序列中的下一个数字?
P.S。在所写的代码中,可以在初始化函数之外初始化所有连接对象,但在我的特定实例中它不是。我需要将连接参数传递给初始化程序。
针对您的情况的一个简单解决方案是使用 Process.name
中包含的 child-process 的序号。您可以使用...
提取它
mp.current_process().name.split('-')[1]
如果您需要更好地控制序列的开始位置,您可以使用 multiprocessing.Value
作为工作人员从中获取其唯一编号的计数器。
import multiprocessing as mp
import time
def init_p(client_id):
with client_id.get_lock():
globals()['client_id'] = client_id.value
print(f"{mp.current_process().name},"
f" {mp.current_process().name.split('-')[1]}," # alternative
f" client_id:{globals()['client_id']}")
client_id.value += 1
if __name__ == "__main__":
ctx = mp.get_context("spawn")
client_ids = ctx.Value('i', 0)
with ctx.Pool(
processes=4,
initializer=init_p,
initargs=(client_ids,)
) as pool:
time.sleep(3)
输出:
SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3
Process finished with exit code 0
我有如下代码:
import multiprocessing as mp
connection: module.Connection
def client_id():
for i in range(mp.cpu_count*2):
yield i
def initproc(host: str, port: int, client_id: int):
global connection
connection.connect(host, port, client_id)
def main():
host = "something"
port = 12345
mp.get_context("spawn").Pool(processes=mp.cpu_count()*2,
initializer=initproc,
initargs=(host, port, client_id())) as p:
res = p.starmap(processing_function, arg_list)
就问题而言 processing_function 和 arg_list 不相关。
问题是我收到一个错误:
ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'generator' object
有没有什么方法可以在池中创建一个初始化进程,使初始化它的参数之一成为序列中的下一个数字?
P.S。在所写的代码中,可以在初始化函数之外初始化所有连接对象,但在我的特定实例中它不是。我需要将连接参数传递给初始化程序。
针对您的情况的一个简单解决方案是使用 Process.name
中包含的 child-process 的序号。您可以使用...
mp.current_process().name.split('-')[1]
如果您需要更好地控制序列的开始位置,您可以使用 multiprocessing.Value
作为工作人员从中获取其唯一编号的计数器。
import multiprocessing as mp
import time
def init_p(client_id):
with client_id.get_lock():
globals()['client_id'] = client_id.value
print(f"{mp.current_process().name},"
f" {mp.current_process().name.split('-')[1]}," # alternative
f" client_id:{globals()['client_id']}")
client_id.value += 1
if __name__ == "__main__":
ctx = mp.get_context("spawn")
client_ids = ctx.Value('i', 0)
with ctx.Pool(
processes=4,
initializer=init_p,
initargs=(client_ids,)
) as pool:
time.sleep(3)
输出:
SpawnPoolWorker-2, 2, client_id:0
SpawnPoolWorker-3, 3, client_id:1
SpawnPoolWorker-1, 1, client_id:2
SpawnPoolWorker-4, 4, client_id:3
Process finished with exit code 0