Python 多处理,从子进程读取输入
Python multiprocessing, read input from child process
我有一个多处理系统,有一个主进程和两个子进程。
其中一个子进程(例如,C1)通过队列向另一个子进程 (C2) 发送消息。 C2 分析来自 C1 的消息,当某些情况发生时,它需要用户的一些输入。
所以,情况是这样的:
main.py
from child_process_01 import Child_Process_01
from child_process_02 import Child_Process_02
set_start_method("spawn")
c1_c2_q = Queue(maxsize=1)
c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
c1.daemon = True
c1.start()
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))
c2.daemon = True
c2.start()
Child_Process_01
message = produced somehow
c1_c2_q.put((message))
Child_Process_02
## infinite loop, keep checking messages and ask for user input under certain conditions
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
代码无法按原样运行,因为多处理模块关闭了它创建的所有进程的标准输入,正如我在此处的许多答案中发现的那样。
一个好的建议似乎是在主进程中重新定义标准并发送给子进程,如解释的那样here or here,所以我尝试了一下:
main.py
newstdin = os.fdopen(os.dup(sys.stdin.fileno()))
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q, newstdin))
c2.daemon = True
c2.start()
Child_Process_02
def Child_Process_02(c1_c2_q, newstdin):
sys.stdin = newstdin
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
但这也不起作用,因为我无法通过队列传递在主进程中创建的 newstdin 对象,我得到错误:
TypeError: cannot pickle '_io.TextIOWrapper' object
此外,对类似问题的一些评论通常不鼓励从子进程读取输入的做法,但我无法想象如何以不同的方式做到这一点。有什么建议吗?
首先,您应该修改 Child_Process_02,以便通过使用阻塞调用 c1_c2_q.get()
,在收到消息之前不会 return(并且您可以摆脱围绕此调用的 try/except
脚手架)。
其次,这是可选的,因为您从 Child_Process_01 传递到 Child_Process_02 的消息有一个生产者和一个消费者,您可以通过调用 multiprocessing.Pipe(duplex=False)
替换 multiprocessing.Queue
来提高效率,这将 return 两个单向 multiprocessing.connection.Connection
实例,其中第二个实例适合发送任意对象,例如字符串,其中第一个用于接收对象(顺便说一下,multiprocessing.Queue
是在 Pipe
之上实现的)。但是全双工 Pipe
其中每个连接都可以用于发送和接收,这绝对是您想要的以下程序,其中主进程启动的线程正在“侦听”请求 input
通过在循环中调用 multiprocessing.connection.Connection.recv()
来调用。此调用编辑的消息 return 是用于调用 input
的提示字符串。由 input
函数编辑的值 return 然后将 sent
返回连接:
from multiprocessing import Process, Pipe
from threading import Thread
def inputter(input_conn):
""" get requests to do input calls """
while True:
input_msg = input_conn.recv()
value = input(input_msg)
input_conn.send(value) # send inputted value:
def worker(msg_conn, input_conn):
while True:
message = msg_conn.recv()
if message is None:
break
if message == 'do input':
# send inputter our prompt message:
input_conn.send('Enter x: ')
# get back the result of the input:
x = (int)(input_conn.recv())
print('The value entered was', x)
else:
print('Got message:', message)
if __name__ == '__main__':
import time
# create the connections for sending messages from one process to another:
recv_conn, send_conn = Pipe(duplex=False)
# create the connections for doing the input requests:
input_conn1, input_conn2 = Pipe(duplex=True) # each connection is bi-drectional
# start the inputter thread with one of the inputter duplex connections:
t = Thread(target=inputter, args=(input_conn1,), daemon=True)
t.start()
# start a child process with the message connection in lieu of a Queue
# and the other inputter connection:
p = Process(target=worker, args=(recv_conn, input_conn2))
p.start()
# send messages to worker process:
send_conn.send('a')
send_conn.send('do input')
send_conn.send('b')
# signal the child process to terminate:
send_conn.send(None)
p.join()
打印:
Got message: a
Enter x: 8
The value entered was 8
Got message: b
注意
需要注意的是,multiprocessing.Queue
为底层 multiprocessing.Pipe
启动了一个馈线线程,以防止“推杆”在 maxsize 调用之前过早阻塞到 put
,其中 maxsize 是用于实例化队列的值。但这也是 multiprocessing.Queue
的性能不如 multiprocessing.Pipe
的原因。但这也意味着在连接上重复调用 send
而另一端没有相应的 recv
调用最终会阻塞。但是鉴于您在队列中指定了 maxsize=1,这几乎不是问题,因为在相同情况下您会阻塞队列。
我有一个多处理系统,有一个主进程和两个子进程。
其中一个子进程(例如,C1)通过队列向另一个子进程 (C2) 发送消息。 C2 分析来自 C1 的消息,当某些情况发生时,它需要用户的一些输入。
所以,情况是这样的:
main.py
from child_process_01 import Child_Process_01
from child_process_02 import Child_Process_02
set_start_method("spawn")
c1_c2_q = Queue(maxsize=1)
c1 = mp.Process(target=Child_Process_01, args=(c1_c2_q,))
c1.daemon = True
c1.start()
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q,))
c2.daemon = True
c2.start()
Child_Process_01
message = produced somehow
c1_c2_q.put((message))
Child_Process_02
## infinite loop, keep checking messages and ask for user input under certain conditions
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
代码无法按原样运行,因为多处理模块关闭了它创建的所有进程的标准输入,正如我在此处的许多答案中发现的那样。 一个好的建议似乎是在主进程中重新定义标准并发送给子进程,如解释的那样here or here,所以我尝试了一下:
main.py
newstdin = os.fdopen(os.dup(sys.stdin.fileno()))
c2 = mp.Process(target=Child_Process_02, args=(c1_c2_q, newstdin))
c2.daemon = True
c2.start()
Child_Process_02
def Child_Process_02(c1_c2_q, newstdin):
sys.stdin = newstdin
while True:
try:
message = c1_c2_q.get_nowait()
except:
message = None
## when message arrives
if message:
if message = something:
usr_input = input('In this situation, I need user input')
但这也不起作用,因为我无法通过队列传递在主进程中创建的 newstdin 对象,我得到错误:
TypeError: cannot pickle '_io.TextIOWrapper' object
此外,对类似问题的一些评论通常不鼓励从子进程读取输入的做法,但我无法想象如何以不同的方式做到这一点。有什么建议吗?
首先,您应该修改 Child_Process_02,以便通过使用阻塞调用 c1_c2_q.get()
,在收到消息之前不会 return(并且您可以摆脱围绕此调用的 try/except
脚手架)。
其次,这是可选的,因为您从 Child_Process_01 传递到 Child_Process_02 的消息有一个生产者和一个消费者,您可以通过调用 multiprocessing.Pipe(duplex=False)
替换 multiprocessing.Queue
来提高效率,这将 return 两个单向 multiprocessing.connection.Connection
实例,其中第二个实例适合发送任意对象,例如字符串,其中第一个用于接收对象(顺便说一下,multiprocessing.Queue
是在 Pipe
之上实现的)。但是全双工 Pipe
其中每个连接都可以用于发送和接收,这绝对是您想要的以下程序,其中主进程启动的线程正在“侦听”请求 input
通过在循环中调用 multiprocessing.connection.Connection.recv()
来调用。此调用编辑的消息 return 是用于调用 input
的提示字符串。由 input
函数编辑的值 return 然后将 sent
返回连接:
from multiprocessing import Process, Pipe
from threading import Thread
def inputter(input_conn):
""" get requests to do input calls """
while True:
input_msg = input_conn.recv()
value = input(input_msg)
input_conn.send(value) # send inputted value:
def worker(msg_conn, input_conn):
while True:
message = msg_conn.recv()
if message is None:
break
if message == 'do input':
# send inputter our prompt message:
input_conn.send('Enter x: ')
# get back the result of the input:
x = (int)(input_conn.recv())
print('The value entered was', x)
else:
print('Got message:', message)
if __name__ == '__main__':
import time
# create the connections for sending messages from one process to another:
recv_conn, send_conn = Pipe(duplex=False)
# create the connections for doing the input requests:
input_conn1, input_conn2 = Pipe(duplex=True) # each connection is bi-drectional
# start the inputter thread with one of the inputter duplex connections:
t = Thread(target=inputter, args=(input_conn1,), daemon=True)
t.start()
# start a child process with the message connection in lieu of a Queue
# and the other inputter connection:
p = Process(target=worker, args=(recv_conn, input_conn2))
p.start()
# send messages to worker process:
send_conn.send('a')
send_conn.send('do input')
send_conn.send('b')
# signal the child process to terminate:
send_conn.send(None)
p.join()
打印:
Got message: a
Enter x: 8
The value entered was 8
Got message: b
注意
需要注意的是,multiprocessing.Queue
为底层 multiprocessing.Pipe
启动了一个馈线线程,以防止“推杆”在 maxsize 调用之前过早阻塞到 put
,其中 maxsize 是用于实例化队列的值。但这也是 multiprocessing.Queue
的性能不如 multiprocessing.Pipe
的原因。但这也意味着在连接上重复调用 send
而另一端没有相应的 recv
调用最终会阻塞。但是鉴于您在队列中指定了 maxsize=1,这几乎不是问题,因为在相同情况下您会阻塞队列。