python 多处理和 socketserver 的管道错误
broken pipe error with python multiprocessing and socketserver
基本上我使用 socketserver python 库来尝试处理从中央服务器到多个树莓派 pi4 和 esp32 外围设备的通信。目前我有 socketserver 运行ning serve_forever,然后请求处理程序从 processmanager class 调用一个方法,该方法启动一个应该处理与客户端的实际通信的进程。
如果我在进程上使用 .join()
这样 processmanager 方法就不会退出,它就可以正常工作,但这不是我希望 运行 的方式。如果没有 .join()
,一旦客户端通信进程尝试将消息发送回客户端,我就会收到管道损坏错误。
这是进程管理器 class,它在主文件中定义,buildprocess
通过套接字服务器的请求处理程序调用 class:
import multiprocessing as mp
mp.allow_connection_pickling()
import queuemanager as qm
import hostmain as hmain
import camproc
import keyproc
import controlproc
# method that gets called into a process so that class and socket share memory
def callprocess(periclass, peritype, clientsocket, inqueue, genqueue):
periclass.startup(clientsocket)
class ProcessManager(qm.QueueManager):
def wipeproc(self, target):
# TODO make wipeproc integrate with the queue manager rather than directly to the class
for macid in list(self.procdict.keys()):
if target == macid:
# calls proc kill for the class
try:
self.procdict[macid]["class"].prockill()
except Exception as e:
print("exception:", e, "in wipeproc")
# waits for process to exit naturally (class threads to close)
self.procdict[macid]["process"].join()
# remove dict entry for this macid
self.procdict.pop(macid)
# called externally to create the new process and append to procdict
def buildprocess(self, peritype, macid, clientsocket):
# TODO put some logic here to handle the differences of the controller process
# generates queue object
inqueue = mp.Queue()
# creates periclass instance based on type
if peritype == hmain.cam:
periclass = camproc.CamMain(self, inqueue, self.genqueue)
elif peritype == hmain.keypad:
print("to be added to")
elif peritype == hmain.motion:
print("to be added to")
elif peritype == hmain.controller:
print("to be added to")
# init and start call for the new process
self.procdict[macid] = {"type": peritype, "inqueue": inqueue, "class": periclass, "process": None}
self.procdict[macid]["process"] = mp.Process(target=callprocess,
args=(self.procdict[macid]["class"], self.procdict[macid]["type"], clientsocket, self.procdict[macid]["inqueue"], self.genqueue))
self.procdict[macid]["process"].start()
# updating the process dictionary before class obj gets appended
# if macid in list(self.procdict.keys()):
# self.wipeproc(macid)
print(self.procdict)
print("client added")
在我看来,所有相关对象都应存储在 procdict 字典中,但正如我所提到的,除非我在 [=14] 结束之前使用 self.procdict[macid]["process"].join()
加入流程,否则它只会出现管道损坏错误=] 方法
我希望它退出该方法,但保持通信过程 运行ning 不变,我尝试了一些不同的方法来重组在过程中和没有定义的内容,但无济于事。到目前为止,我还没有能够在网上找到任何相关的解决方案,但当然我也可能遗漏了一些东西。
感谢您阅读到这里!我已经坚持了几天,所以任何帮助将不胜感激,这是我的第一个具有多处理和任何规模的套接字的项目。
#################
编辑以包含带有所有代码的 pastebin:
Without .join()
i get a broken pipe error as soon as the client communication process tries to send a message back to the client.
那是因为在请求处理程序 handle()
returns 时,socketserver
执行 shutdown
连接。 socketserver
简化了编写网络服务器的任务 意味着它会自动完成某些通常在网络请求处理过程中完成的事情。您的代码没有完全按预期使用 socketserver
。特别是,对于异步处理请求,Asynchronous Mixins are intended. With the ForkingMixIn 服务器将为每个请求生成一个新进程,与您当前使用 mp.Process
自行执行此操作的代码形成对比.所以,我认为你基本上有两个选择:
- 减少自己处理请求的代码,并使用提供的
socketserver
方法
- 保持自己的处理方式,完全不使用
socketserver
,这样它就不会碍事。
基本上我使用 socketserver python 库来尝试处理从中央服务器到多个树莓派 pi4 和 esp32 外围设备的通信。目前我有 socketserver 运行ning serve_forever,然后请求处理程序从 processmanager class 调用一个方法,该方法启动一个应该处理与客户端的实际通信的进程。
如果我在进程上使用 .join()
这样 processmanager 方法就不会退出,它就可以正常工作,但这不是我希望 运行 的方式。如果没有 .join()
,一旦客户端通信进程尝试将消息发送回客户端,我就会收到管道损坏错误。
这是进程管理器 class,它在主文件中定义,buildprocess
通过套接字服务器的请求处理程序调用 class:
import multiprocessing as mp
mp.allow_connection_pickling()
import queuemanager as qm
import hostmain as hmain
import camproc
import keyproc
import controlproc
# method that gets called into a process so that class and socket share memory
def callprocess(periclass, peritype, clientsocket, inqueue, genqueue):
periclass.startup(clientsocket)
class ProcessManager(qm.QueueManager):
def wipeproc(self, target):
# TODO make wipeproc integrate with the queue manager rather than directly to the class
for macid in list(self.procdict.keys()):
if target == macid:
# calls proc kill for the class
try:
self.procdict[macid]["class"].prockill()
except Exception as e:
print("exception:", e, "in wipeproc")
# waits for process to exit naturally (class threads to close)
self.procdict[macid]["process"].join()
# remove dict entry for this macid
self.procdict.pop(macid)
# called externally to create the new process and append to procdict
def buildprocess(self, peritype, macid, clientsocket):
# TODO put some logic here to handle the differences of the controller process
# generates queue object
inqueue = mp.Queue()
# creates periclass instance based on type
if peritype == hmain.cam:
periclass = camproc.CamMain(self, inqueue, self.genqueue)
elif peritype == hmain.keypad:
print("to be added to")
elif peritype == hmain.motion:
print("to be added to")
elif peritype == hmain.controller:
print("to be added to")
# init and start call for the new process
self.procdict[macid] = {"type": peritype, "inqueue": inqueue, "class": periclass, "process": None}
self.procdict[macid]["process"] = mp.Process(target=callprocess,
args=(self.procdict[macid]["class"], self.procdict[macid]["type"], clientsocket, self.procdict[macid]["inqueue"], self.genqueue))
self.procdict[macid]["process"].start()
# updating the process dictionary before class obj gets appended
# if macid in list(self.procdict.keys()):
# self.wipeproc(macid)
print(self.procdict)
print("client added")
在我看来,所有相关对象都应存储在 procdict 字典中,但正如我所提到的,除非我在 [=14] 结束之前使用 self.procdict[macid]["process"].join()
加入流程,否则它只会出现管道损坏错误=] 方法
我希望它退出该方法,但保持通信过程 运行ning 不变,我尝试了一些不同的方法来重组在过程中和没有定义的内容,但无济于事。到目前为止,我还没有能够在网上找到任何相关的解决方案,但当然我也可能遗漏了一些东西。
感谢您阅读到这里!我已经坚持了几天,所以任何帮助将不胜感激,这是我的第一个具有多处理和任何规模的套接字的项目。
#################
编辑以包含带有所有代码的 pastebin:
Without
.join()
i get a broken pipe error as soon as the client communication process tries to send a message back to the client.
那是因为在请求处理程序 handle()
returns 时,socketserver
执行 shutdown
连接。 socketserver
简化了编写网络服务器的任务 意味着它会自动完成某些通常在网络请求处理过程中完成的事情。您的代码没有完全按预期使用 socketserver
。特别是,对于异步处理请求,Asynchronous Mixins are intended. With the ForkingMixIn 服务器将为每个请求生成一个新进程,与您当前使用 mp.Process
自行执行此操作的代码形成对比.所以,我认为你基本上有两个选择:
- 减少自己处理请求的代码,并使用提供的
socketserver
方法 - 保持自己的处理方式,完全不使用
socketserver
,这样它就不会碍事。