如何在 Python 中的两个独立进程之间来回交换数据
How to exchange data back and forth between two separate processes in Python
问题
有两个独立的进程 运行 并行,我希望它们来回通信。
代码说明
密码在Python2.7。在我的精简脚本中,我使用队列进行进程间通信。进程 p1 将数据放入队列中。进程 p2 从队列中获取数据并对数据进行处理。然后进程 p2 将修改后的数据放回队列中,最后进程 p1 从队列中取回修改后的数据。修改后的数据必须return到进程p1,因为这个进程确实是一个sends/receives请求的eventlet服务器。
代码
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back-and-forth data exchange between processes
# common modules
import os
import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
from datetime import datetime
someData = {}
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#20 process 1: put in queue1 =>", someData
time.sleep(3)
while True: # queue1 checking loop, comment out the loop if use time.sleep only
if queue1.empty() == False:
timestamp = str(datetime.now())
res = queue1.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#28 get from queue1 =>", res
break
else:
print "#31 queue1 empty"
time.sleep(1)
# while True: # queue2 checking loop
# if queue2.empty() == False:
# timestamp = str(datetime.now())
# res = queue2.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#39 get from queue2 =>", res
# break
# else:
# print "#42 queue2 empty"
# time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#54 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#57 queue1 empty"
pass
except:
print "#60 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
print "#75 process 2: put back in queue1 =>", self.someData
res = self.someData
queue1.put(res)
#print "#78 process 2: put back in queue2 =>", self.someData
#res = self.someData
#queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
queue1 = Queue()
#queue2 = Queue()
global p1, p2
p1 = Process(target=Load().post(), args=(queue1,)) # process p1
#p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
#p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
问题
我已经检查了其他相关资源,但它们都涉及单向通信。以下是资源列表。
- in-python-how-do-you-get-data-back-from-a-particular-process-using-multiprocess
- multiprocessing module supports locks
如何让 process1 等待并从 process2 检索修改后的数据?我是否应该考虑另一种进程间通信方法,例如管道、zeroMQ?
尝试 1:在进程 1 中使用没有 while 循环的 time.sleep()
只有 time.sleep 数据返回队列,但永远不会到达进程 1 中的最终目的地。到目前为止一切顺利,但缺少最后一步。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:40:30.234466', 'class': 'Load()', 'method': 'post()'}
#54 process 2: get from queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:33.239113', 'class': 'Unload()', 'method': 'get()'}
#68 process 2: do something => {'process': 'p2', 'timestamp': '2020-02-23 11:40:36.242500', 'class': 'Unload()', 'method': 'doSomething()'}
#75 process 2: put back in queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:39.245856', 'class': 'Unload()', 'method': 'put()'}
尝试 2:在进程 1 中使用 while 循环
通过 while 循环检查队列,数据进入队列但之后立即被捕获,它们永远不会到达进程 2。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:14.606356', 'class': 'Load()', 'method': 'post()'}
#28 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:17.610202', 'class': 'Load()', 'method': 'post()'}
#57 queue1 empty
尝试 3:使用两个队列
使用两个队列:queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们神秘地消失了。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:53:39.745177', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty
----- 更新 20200224:尝试 4、5 和 6 -------------------------- --------------------------------------
尝试 4:使用两个队列 manager.Queue()
在 manager.Queue() 中使用两个队列:queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们又神秘地消失了。代码和结果如下。
尝试4的代码:
#!/usr/bin/python2.7 python2.7
# -- 编码:utf-8 --
# 序列化进程间数据交换脚本
# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime
someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
queue2 = manager.Queue()
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#20 process 1: put in queue1 =>", someData
time.sleep(3)
# while True: # queue1 checking loop
# if queue1.empty() == False:
# timestamp = str(datetime.now())
# res = queue1.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#28 get from queue1 =>", res
# break
# else:
# print "#31 queue1 empty"
# time.sleep(1)
while True: # queue2 checking loop
if queue2.empty() == False:
timestamp = str(datetime.now())
res = queue2.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#39 get from queue2 =>", res
break
else:
print "#42 queue2 empty"
time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#54 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#57 queue1 empty"
pass
except:
print "#60 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
res = self.someData
#print "#75 process 2: put back in queue1 =>", self.someData
#queue1.put(res)
print "#78 process 2: put back in queue2 =>", self.someData
queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
manager = multiprocessing.Manager()
queue1 = manager.Queue()
queue2 = manager.Queue()
global p1, p2
#p1 = Process(target=Load().post(), args=(queue1,)) # process p1
p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
#p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试4的结果:
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 13:06:17.687762', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty
尝试 5:使用一个队列 manager.Queue()
使用带有 manager.Queue() 的一个队列:queue1 从 process1 到 process2,queue1 从 process2 返回到 process1。数据进入 queue1 但之后立即被捕获,它们永远不会到达进程 2。代码结果如下。
尝试5的代码:
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange
# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime
someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
#queue2 = manager.Queue()
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#25 process 1: put in queue1 =>", someData
time.sleep(3)
while True: # queue1 checking loop
if queue1.empty() == False:
timestamp = str(datetime.now())
res = queue1.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#33 get from queue1 =>", res
break
else:
print "#36 queue1 empty"
time.sleep(1)
# while True: # queue2 checking loop
# if queue2.empty() == False:
# timestamp = str(datetime.now())
# res = queue2.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#44 get from queue2 =>", res
# break
# else:
# print "#47 queue2 empty"
# time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#59 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#62 queue1 empty"
pass
except:
print "#65 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#73 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
res = self.someData
print "#81 process 2: put back in queue1 =>", self.someData
queue1.put(res)
#print "#83 process 2: put back in queue2 =>", self.someData
#queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
manager = multiprocessing.Manager()
queue1 = manager.Queue()
#queue2 = manager.Queue()
global p1, p2
p1 = Process(target=Load().post(), args=(queue1,)) # process p1
#p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
#p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试5的结果:
#25 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:13.975886', 'class': 'Load()', 'method': 'post()'}
#33 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:16.980382', 'class': 'Load()', 'method': 'post()'}
#62 queue1 empty
尝试 6:使用队列超时
按照建议,我尝试更正队列超时。该方法再次是 queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们又神秘地消失了。代码和结果如下。
尝试6的代码:
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange
# common modules
import os
import sys
import time
import uuid
import Queue
#from Queue import Empty
import multiprocessing
from multiprocessing import Process
#from multiprocessing import Queue
from datetime import datetime
someData = {}
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#24 process 1: put in queue1 =>", someData
time.sleep(3)
# while True: # queue1 checking loop
# if queue1.empty() == False:
# timestamp = str(datetime.now())
# res = queue1.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#33 get from queue1 =>", res
# break
# else:
# print "#36 queue1 empty"
# time.sleep(1)
while True: # queue2 checking loop
try:
someData = queue2.get(True,1)
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#43 process 1: got from queue2 =>", someData
break
except Queue.Empty:
print "#46 process1: queue2 empty"
continue
class Unload():
def get(self):
while True: # queue2 checking loop
try:
someData = queue1.get(True,1)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#56 process2: got from queue1 =>", someData
break
except Queue.Empty:
print "#59 process2: queue1 empty"
continue
self.doSomething(someData) # call method
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process2: do something =>", someData
self.put(someData)
def put(self,data):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
someData = self.someData
#print "#81 process 2: put back in queue1 =>", self.someData
#queue1.put(res)
print "#78 process2: put back in queue2 =>", someData
queue2.put(someData)
# main
if __name__ == '__main__':
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
global p1, p2
#p1 = Process(target=Load().post(), args=(queue1,)) # process p1
p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
#p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试6的结果:
#24 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 18:14:46.435661', 'class': 'Load()', 'method': 'post()'}
#46 process1: queue2 empty
注意:当我在没有 类 的情况下使用时,建议的方法有效。代码如下:
import uuid
import multiprocessing
from multiprocessing import Process
import Queue
def load(que_in, que_out):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
que_in.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = que_out.get(True, 1)
except Queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
def unload(que_in, que_out):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = que_in.get(True, 1)
except Queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
que_out.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
que_in = multiprocessing.Queue()
que_out = multiprocessing.Queue()
p1 = Process(target=load, args=(que_in, que_out)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=unload, args=(que_in, que_out)) # process p2
p2.start()
p2.join()
----- 更新 20200225:尝试 7 ------------------------------ ----------------------------------------------
ATTEMPT 7:在不同 类(工作)中使用一个队列超时的队列
在这次尝试中,我在不同 类 的方法之间使用了一个共享队列,并纠正了超时。数据在 shared_queue 中从 process1 到 process2,然后从 process2 返回到 process1。在这次尝试中,数据传输正确。代码和结果如下。
尝试7的代码:
import uuid
import multiprocessing
from multiprocessing import Process
import Queue
class Input():
def load(self, shared_queue):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
shared_queue.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = shared_queue.get(True, 1)
except Queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
break
class Output():
def unload(self, shared_queue):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = shared_queue.get(True, 1)
except Queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
shared_queue.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
shared_queue = multiprocessing.Queue()
up = Input()
down = Output()
p1 = Process(target=up.load, args=(shared_queue,)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=down.unload, args=(shared_queue,)) # process p2
p2.start()
p1.join()
p2.join()
尝试7的结果:
load: sent request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: got request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: sent result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed
load: got result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed
您必须使用 Manager-wrapped 队列来跨进程传播更改,否则每个进程都有其单独的队列对象并且看不到其他进程。管理器为所有子进程创建队列的共享实例。
因此 queue1 = Queue()
变为 queue1 = manager.Queue()
,顶部为 from multiprocessing import Manager
。如果你想使用你的双队列方法,你显然必须以相同的方式包装第二个队列。
相关资源:
我想你只是错过了队列超时的用法
try:
result = que_out.get(True, 1)
except queue.Empty:
continue
这个简化的示例可能对您有所帮助:
import uuid
from multiprocessing import Process
from multiprocessing import Queue
import queue
def load(que_in, que_out):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
que_in.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = que_out.get(True, 1)
except queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
def unload(que_in, que_out):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = que_in.get(True, 1)
except queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
que_out.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
que_in = Queue()
que_out = Queue()
p1 = Process(target=load, args=(que_in, que_out)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=unload, args=(que_in, que_out)) # process p2
p2.start()
p2.join()
输出
load: sent request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: got request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: sent result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed
load: got result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed
解决方案:使用一个共享队列
在遵循建议并进行一些调整以正确定位不同的 类 方法后,我解决了这个问题。两个独立进程之间的数据来回流动现在是正确的。对我来说,一个重要的注意事项是要特别注意两个独立进程之间交换的 someData
包,它确实必须是被扔来扔去的同一个包。因此,标识符条目 "id": uuid.uuid4()
用于检查每个段落的包是否相同。
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back and forth communication between two separate processes using a shared queue
# common modules
import os
import sys
import time
import uuid
import Queue
import multiprocessing
from multiprocessing import Process
from datetime import datetime
someData = {}
class Load():
def post(self, sharedQueue):
timestamp = str(datetime.now()) # for timing checking
someData = {"timestamp":timestamp, "id": uuid.uuid4(), "workload": "do_stuff",}
self.someData = someData
sharedQueue.put(someData) # put into the shared queue
print("#25 p1 load: sent someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
time.sleep(1) # for the time flow
while True: # sharedQueue checking loop
try:
time.sleep(1) # for the time flow
timestamp = str(datetime.now())
someData = sharedQueue.get(True,1)
someData["timestamp"] = timestamp
print("#37 p1 load: got back someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
break
except Queue.Empty:
print("#37 p1: sharedQueue empty")
continue
break
class Unload():
def get(self, sharedQueue):
while True: # sharedQueue checking loop
try:
someData = sharedQueue.get(True,1)
self.someData = someData
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
print("#50 p2 unload: got someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
break
except Queue.Empty:
print("#53 p2: sharedQueue empty")
continue
time.sleep(1) # for the time flow
self.doSomething(someData) # pass the data to the method
def doSomething(self, someData): # execute some code here
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
print("#62 p2 unload: doSomething {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
self.put(someData)
time.sleep(1) # for the time flow
def put(self,someData):
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
sharedQueue.put(someData)
print("#71 p2 unload: put someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
time.sleep(1) # for the time flow
# main
if __name__ == '__main__':
sharedQueue = multiprocessing.Queue()
trx = Load()
rcx = Unload()
p1 = Process(target=trx.post, args=(sharedQueue,)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=rcx.get, args=(sharedQueue,)) # process p2
p2.start()
p1.join()
p2.join()
问题
有两个独立的进程 运行 并行,我希望它们来回通信。
代码说明
密码在Python2.7。在我的精简脚本中,我使用队列进行进程间通信。进程 p1 将数据放入队列中。进程 p2 从队列中获取数据并对数据进行处理。然后进程 p2 将修改后的数据放回队列中,最后进程 p1 从队列中取回修改后的数据。修改后的数据必须return到进程p1,因为这个进程确实是一个sends/receives请求的eventlet服务器。
代码
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back-and-forth data exchange between processes
# common modules
import os
import sys
import time
from multiprocessing import Process
from multiprocessing import Queue
from datetime import datetime
someData = {}
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#20 process 1: put in queue1 =>", someData
time.sleep(3)
while True: # queue1 checking loop, comment out the loop if use time.sleep only
if queue1.empty() == False:
timestamp = str(datetime.now())
res = queue1.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#28 get from queue1 =>", res
break
else:
print "#31 queue1 empty"
time.sleep(1)
# while True: # queue2 checking loop
# if queue2.empty() == False:
# timestamp = str(datetime.now())
# res = queue2.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#39 get from queue2 =>", res
# break
# else:
# print "#42 queue2 empty"
# time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#54 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#57 queue1 empty"
pass
except:
print "#60 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
print "#75 process 2: put back in queue1 =>", self.someData
res = self.someData
queue1.put(res)
#print "#78 process 2: put back in queue2 =>", self.someData
#res = self.someData
#queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
queue1 = Queue()
#queue2 = Queue()
global p1, p2
p1 = Process(target=Load().post(), args=(queue1,)) # process p1
#p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
#p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
问题 我已经检查了其他相关资源,但它们都涉及单向通信。以下是资源列表。
- in-python-how-do-you-get-data-back-from-a-particular-process-using-multiprocess
- multiprocessing module supports locks
如何让 process1 等待并从 process2 检索修改后的数据?我是否应该考虑另一种进程间通信方法,例如管道、zeroMQ?
尝试 1:在进程 1 中使用没有 while 循环的 time.sleep() 只有 time.sleep 数据返回队列,但永远不会到达进程 1 中的最终目的地。到目前为止一切顺利,但缺少最后一步。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:40:30.234466', 'class': 'Load()', 'method': 'post()'}
#54 process 2: get from queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:33.239113', 'class': 'Unload()', 'method': 'get()'}
#68 process 2: do something => {'process': 'p2', 'timestamp': '2020-02-23 11:40:36.242500', 'class': 'Unload()', 'method': 'doSomething()'}
#75 process 2: put back in queue1 => {'process': 'p2', 'timestamp': '2020-02-23 11:40:39.245856', 'class': 'Unload()', 'method': 'put()'}
尝试 2:在进程 1 中使用 while 循环 通过 while 循环检查队列,数据进入队列但之后立即被捕获,它们永远不会到达进程 2。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:14.606356', 'class': 'Load()', 'method': 'post()'}
#28 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:46:17.610202', 'class': 'Load()', 'method': 'post()'}
#57 queue1 empty
尝试 3:使用两个队列 使用两个队列:queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们神秘地消失了。结果如下。
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-23 11:53:39.745177', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty
----- 更新 20200224:尝试 4、5 和 6 -------------------------- --------------------------------------
尝试 4:使用两个队列 manager.Queue()
在 manager.Queue() 中使用两个队列:queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们又神秘地消失了。代码和结果如下。
尝试4的代码: #!/usr/bin/python2.7 python2.7 # -- 编码:utf-8 -- # 序列化进程间数据交换脚本
# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime
someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
queue2 = manager.Queue()
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#20 process 1: put in queue1 =>", someData
time.sleep(3)
# while True: # queue1 checking loop
# if queue1.empty() == False:
# timestamp = str(datetime.now())
# res = queue1.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#28 get from queue1 =>", res
# break
# else:
# print "#31 queue1 empty"
# time.sleep(1)
while True: # queue2 checking loop
if queue2.empty() == False:
timestamp = str(datetime.now())
res = queue2.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#39 get from queue2 =>", res
break
else:
print "#42 queue2 empty"
time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#54 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#57 queue1 empty"
pass
except:
print "#60 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
res = self.someData
#print "#75 process 2: put back in queue1 =>", self.someData
#queue1.put(res)
print "#78 process 2: put back in queue2 =>", self.someData
queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
manager = multiprocessing.Manager()
queue1 = manager.Queue()
queue2 = manager.Queue()
global p1, p2
#p1 = Process(target=Load().post(), args=(queue1,)) # process p1
p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
#p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试4的结果:
#20 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 13:06:17.687762', 'class': 'Load()', 'method': 'post()'}
#42 queue2 empty
尝试 5:使用一个队列 manager.Queue() 使用带有 manager.Queue() 的一个队列:queue1 从 process1 到 process2,queue1 从 process2 返回到 process1。数据进入 queue1 但之后立即被捕获,它们永远不会到达进程 2。代码结果如下。
尝试5的代码:
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange
# common modules
import os
import sys
import time
import multiprocessing
from multiprocessing import Process
from multiprocessing import Queue
from multiprocessing import Manager
from datetime import datetime
someData = {}
manager = multiprocessing.Manager()
queue1 = manager.Queue()
#queue2 = manager.Queue()
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#25 process 1: put in queue1 =>", someData
time.sleep(3)
while True: # queue1 checking loop
if queue1.empty() == False:
timestamp = str(datetime.now())
res = queue1.get()
res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#33 get from queue1 =>", res
break
else:
print "#36 queue1 empty"
time.sleep(1)
# while True: # queue2 checking loop
# if queue2.empty() == False:
# timestamp = str(datetime.now())
# res = queue2.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#44 get from queue2 =>", res
# break
# else:
# print "#47 queue2 empty"
# time.sleep(1)
class Unload():
def get(self):
try:
if queue1.empty() == False:
data = queue1.get() # retrieve package from queue
#queue1.close()
#queue1.join_thread()
timestamp = str(datetime.now())
data = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#59 process 2: get from queue1 =>", data
self.doSomething(data) # call method
else:
print "#62 queue1 empty"
pass
except:
print "#65 queue1 error"
pass
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#73 process 2: do something =>", someData
self.put()
def put(self):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
res = self.someData
print "#81 process 2: put back in queue1 =>", self.someData
queue1.put(res)
#print "#83 process 2: put back in queue2 =>", self.someData
#queue2.put(res)
#queue2.close()
#queue2.join_thread()
# main
if __name__ == '__main__':
manager = multiprocessing.Manager()
queue1 = manager.Queue()
#queue2 = manager.Queue()
global p1, p2
p1 = Process(target=Load().post(), args=(queue1,)) # process p1
#p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
#p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试5的结果:
#25 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:13.975886', 'class': 'Load()', 'method': 'post()'}
#33 get from queue1 => {'process': 'p1', 'timestamp': '2020-02-24 14:08:16.980382', 'class': 'Load()', 'method': 'post()'}
#62 queue1 empty
尝试 6:使用队列超时
按照建议,我尝试更正队列超时。该方法再次是 queue1 从 process1 到 process2,queue2 从 process2 到 process1。数据进入 queue1 但不 return 进入 queue2,它们又神秘地消失了。代码和结果如下。
尝试6的代码:
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for serialized interprocess data exchange
# common modules
import os
import sys
import time
import uuid
import Queue
#from Queue import Empty
import multiprocessing
from multiprocessing import Process
#from multiprocessing import Queue
from datetime import datetime
someData = {}
class Load():
def post(self):
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
queue1.put(someData) # put into queue
print "#24 process 1: put in queue1 =>", someData
time.sleep(3)
# while True: # queue1 checking loop
# if queue1.empty() == False:
# timestamp = str(datetime.now())
# res = queue1.get()
# res = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
# print "#33 get from queue1 =>", res
# break
# else:
# print "#36 queue1 empty"
# time.sleep(1)
while True: # queue2 checking loop
try:
someData = queue2.get(True,1)
timestamp = str(datetime.now())
someData = {"process":"p1","class":"Load()","method":"post()","timestamp":timestamp}
print "#43 process 1: got from queue2 =>", someData
break
except Queue.Empty:
print "#46 process1: queue2 empty"
continue
class Unload():
def get(self):
while True: # queue2 checking loop
try:
someData = queue1.get(True,1)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"get()","timestamp":timestamp}
print "#56 process2: got from queue1 =>", someData
break
except Queue.Empty:
print "#59 process2: queue1 empty"
continue
self.doSomething(someData) # call method
def doSomething(self, data):
time.sleep(3)
timestamp = str(datetime.now())
someData = {"process":"p2","class":"Unload()","method":"doSomething()","timestamp":timestamp}
self.someData = someData
print "#68 process2: do something =>", someData
self.put(someData)
def put(self,data):
time.sleep(3)
timestamp = str(datetime.now())
self.someData = {"process":"p2","class":"Unload()","method":"put()","timestamp":timestamp}
someData = self.someData
#print "#81 process 2: put back in queue1 =>", self.someData
#queue1.put(res)
print "#78 process2: put back in queue2 =>", someData
queue2.put(someData)
# main
if __name__ == '__main__':
queue1 = multiprocessing.Queue()
queue2 = multiprocessing.Queue()
global p1, p2
#p1 = Process(target=Load().post(), args=(queue1,)) # process p1
p1 = Process(target=Load().post(), args=(queue1,queue2,))
p1.daemon = True
p1.start()
#p2 = Process(target=Unload().get(), args=(queue1,)) # process p2
p2 = Process(target=Unload().get(), args=(queue1,queue2,))
p2.start()
p2.join()
尝试6的结果:
#24 process 1: put in queue1 => {'process': 'p1', 'timestamp': '2020-02-24 18:14:46.435661', 'class': 'Load()', 'method': 'post()'}
#46 process1: queue2 empty
注意:当我在没有 类 的情况下使用时,建议的方法有效。代码如下:
import uuid
import multiprocessing
from multiprocessing import Process
import Queue
def load(que_in, que_out):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
que_in.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = que_out.get(True, 1)
except Queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
def unload(que_in, que_out):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = que_in.get(True, 1)
except Queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
que_out.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
que_in = multiprocessing.Queue()
que_out = multiprocessing.Queue()
p1 = Process(target=load, args=(que_in, que_out)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=unload, args=(que_in, que_out)) # process p2
p2.start()
p2.join()
----- 更新 20200225:尝试 7 ------------------------------ ----------------------------------------------
ATTEMPT 7:在不同 类(工作)中使用一个队列超时的队列
在这次尝试中,我在不同 类 的方法之间使用了一个共享队列,并纠正了超时。数据在 shared_queue 中从 process1 到 process2,然后从 process2 返回到 process1。在这次尝试中,数据传输正确。代码和结果如下。
尝试7的代码:
import uuid
import multiprocessing
from multiprocessing import Process
import Queue
class Input():
def load(self, shared_queue):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
shared_queue.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = shared_queue.get(True, 1)
except Queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
break
class Output():
def unload(self, shared_queue):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = shared_queue.get(True, 1)
except Queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
shared_queue.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
shared_queue = multiprocessing.Queue()
up = Input()
down = Output()
p1 = Process(target=up.load, args=(shared_queue,)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=down.unload, args=(shared_queue,)) # process p2
p2.start()
p1.join()
p2.join()
尝试7的结果:
load: sent request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: got request a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff
unload: sent result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed
load: got result a461357a-b39a-43c4-89a8-a77486a5bf45: do_stuff processed
您必须使用 Manager-wrapped 队列来跨进程传播更改,否则每个进程都有其单独的队列对象并且看不到其他进程。管理器为所有子进程创建队列的共享实例。
因此 queue1 = Queue()
变为 queue1 = manager.Queue()
,顶部为 from multiprocessing import Manager
。如果你想使用你的双队列方法,你显然必须以相同的方式包装第二个队列。
相关资源:
我想你只是错过了队列超时的用法
try:
result = que_out.get(True, 1)
except queue.Empty:
continue
这个简化的示例可能对您有所帮助:
import uuid
from multiprocessing import Process
from multiprocessing import Queue
import queue
def load(que_in, que_out):
request = {"id": uuid.uuid4(), "workload": "do_stuff", }
que_in.put(request)
print("load: sent request {}: {}".format(request["id"], request["workload"]))
while True:
try:
result = que_out.get(True, 1)
except queue.Empty:
continue
print("load: got result {}: {}".format(result["id"], result["result"]))
def unload(que_in, que_out):
def processed(request):
return {"id": request["id"], "result": request["workload"] + " processed", }
while True:
try:
request = que_in.get(True, 1)
except queue.Empty:
continue
print("unload: got request {}: {}".format(request["id"], request["workload"]))
result = processed(request)
que_out.put(result)
print("unload: sent result {}: {}".format(result["id"], result["result"]))
# main
if __name__ == '__main__':
que_in = Queue()
que_out = Queue()
p1 = Process(target=load, args=(que_in, que_out)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=unload, args=(que_in, que_out)) # process p2
p2.start()
p2.join()
输出
load: sent request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: got request d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff
unload: sent result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed
load: got result d9894e41-3e8a-4474-9563-1a99797bc722: do_stuff processed
解决方案:使用一个共享队列
在遵循建议并进行一些调整以正确定位不同的 类 方法后,我解决了这个问题。两个独立进程之间的数据来回流动现在是正确的。对我来说,一个重要的注意事项是要特别注意两个独立进程之间交换的 someData
包,它确实必须是被扔来扔去的同一个包。因此,标识符条目 "id": uuid.uuid4()
用于检查每个段落的包是否相同。
#!/usr/bin/python2.7 python2.7
# -*- coding: utf-8 -*-
# script for back and forth communication between two separate processes using a shared queue
# common modules
import os
import sys
import time
import uuid
import Queue
import multiprocessing
from multiprocessing import Process
from datetime import datetime
someData = {}
class Load():
def post(self, sharedQueue):
timestamp = str(datetime.now()) # for timing checking
someData = {"timestamp":timestamp, "id": uuid.uuid4(), "workload": "do_stuff",}
self.someData = someData
sharedQueue.put(someData) # put into the shared queue
print("#25 p1 load: sent someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
time.sleep(1) # for the time flow
while True: # sharedQueue checking loop
try:
time.sleep(1) # for the time flow
timestamp = str(datetime.now())
someData = sharedQueue.get(True,1)
someData["timestamp"] = timestamp
print("#37 p1 load: got back someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
break
except Queue.Empty:
print("#37 p1: sharedQueue empty")
continue
break
class Unload():
def get(self, sharedQueue):
while True: # sharedQueue checking loop
try:
someData = sharedQueue.get(True,1)
self.someData = someData
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
print("#50 p2 unload: got someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
break
except Queue.Empty:
print("#53 p2: sharedQueue empty")
continue
time.sleep(1) # for the time flow
self.doSomething(someData) # pass the data to the method
def doSomething(self, someData): # execute some code here
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
print("#62 p2 unload: doSomething {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
self.put(someData)
time.sleep(1) # for the time flow
def put(self,someData):
timestamp = str(datetime.now())
someData["timestamp"] = timestamp
sharedQueue.put(someData)
print("#71 p2 unload: put someData {}: {}".format(someData["id"], someData["timestamp"], someData["workload"]))
time.sleep(1) # for the time flow
# main
if __name__ == '__main__':
sharedQueue = multiprocessing.Queue()
trx = Load()
rcx = Unload()
p1 = Process(target=trx.post, args=(sharedQueue,)) # process p1
p1.daemon = True
p1.start()
p2 = Process(target=rcx.get, args=(sharedQueue,)) # process p2
p2.start()
p1.join()
p2.join()