python 多处理和 socketserver 的管道错误

broken pipe error with python multiprocessing and socketserver

基本上我使用 socketserver python 库来尝试处理从中央服务器到多个树莓派 pi4 和 esp32 外围设备的通信。目前我有 socketserver 运行ning serve_forever,然后请求处理程序从 processmanager class 调用一个方法,该方法启动一个应该处理与客户端的实际通信的进程。

如果我在进程上使用 .join() 这样 processmanager 方法就不会退出,它就可以正常工作,但这不是我希望 运行 的方式。如果没有 .join(),一旦客户端通信进程尝试将消息发送回客户端,我就会收到管道损坏错误。

这是进程管理器 class,它在主文件中定义,buildprocess 通过套接字服务器的请求处理程序调用 class:

import multiprocessing as mp
mp.allow_connection_pickling()

import queuemanager as qm
import hostmain as hmain


import camproc
import keyproc

import controlproc


# method that gets called into a process so that class and socket share memory
def callprocess(periclass, peritype, clientsocket, inqueue, genqueue):

    periclass.startup(clientsocket)


class ProcessManager(qm.QueueManager):

    def wipeproc(self, target):

# TODO make wipeproc integrate with the queue manager rather than directly to the class
        for macid in list(self.procdict.keys()):
            if target == macid:
                # calls proc kill for the class
                try:
                    self.procdict[macid]["class"].prockill()
                except Exception as e:
                    print("exception:", e, "in wipeproc")
                    
                # waits for process to exit naturally (class threads to close)
                self.procdict[macid]["process"].join()
                # remove dict entry for this macid
                self.procdict.pop(macid)
    



    # called externally to create the new process and append to procdict
    def buildprocess(self, peritype, macid, clientsocket):
        # TODO put some logic here to handle the differences of the controller process
        
        # generates queue object
        inqueue = mp.Queue()

        # creates periclass instance based on type
        if peritype == hmain.cam:
            periclass = camproc.CamMain(self, inqueue, self.genqueue)
        elif peritype == hmain.keypad:
            print("to be added to")
        elif peritype == hmain.motion:
            print("to be added to")
        elif peritype == hmain.controller:
            print("to be added to")
        
        # init and start call for the new process
        self.procdict[macid] = {"type": peritype, "inqueue": inqueue, "class": periclass, "process": None}
 
        self.procdict[macid]["process"] = mp.Process(target=callprocess, 
        args=(self.procdict[macid]["class"], self.procdict[macid]["type"], clientsocket, self.procdict[macid]["inqueue"], self.genqueue))

        self.procdict[macid]["process"].start()
        
        # updating the process dictionary before class obj gets appended
#        if macid in list(self.procdict.keys()):
#            self.wipeproc(macid)
        
            

        print(self.procdict)
        print("client added")

在我看来,所有相关对象都应存储在 procdict 字典中,但正如我所提到的,除非我在 [=14] 结束之前使用 self.procdict[macid]["process"].join() 加入流程,否则它只会出现管道损坏错误=] 方法

我希望它退出该方法,但保持通信过程 运行ning 不变,我尝试了一些不同的方法来重组在过程中和没有定义的内容,但无济于事。到目前为止,我还没有能够在网上找到任何相关的解决方案,但当然我也可能遗漏了一些东西。

感谢您阅读到这里!我已经坚持了几天,所以任何帮助将不胜感激,这是我的第一个具有多处理和任何规模的套接字的项目。

#################

编辑以包含带有所有代码的 pastebin:

https://pastebin.com/u/kadytoast/1/PPWfyCFT

Without .join() i get a broken pipe error as soon as the client communication process tries to send a message back to the client.

那是因为在请求处理程序 handle() returns 时,socketserver 执行 shutdown 连接。 socketserver 简化了编写网络服务器的任务 意味着它会自动完成某些通常在网络请求处理过程中完成的事情。您的代码没有完全按预期使用 socketserver。特别是,对于异步处理请求,Asynchronous Mixins are intended. With the ForkingMixIn 服务器将为每个请求生成一个新进程,与您当前使用 mp.Process 自行执行此操作的代码形成对比.所以,我认为你基本上有两个选择:

  • 减少自己处理请求的代码,并使用提供的 socketserver 方法
  • 保持自己的处理方式,完全不使用 socketserver,这样它就不会碍事。