zeromq 和 python 多处理,打开的文件太多

zeromq and python multiprocessing, too many open files

我有一个基于代理的模型,其中多个代理由一个中央进程启动并通过另一个中央进程进行通信。每个代理和通信进程都通过 zmq 进行通信。但是,当我启动超过 100 个代理时 standard_out 发送:

Invalid argument (src/stream_engine.cpp:143) Too many open files (src/ipc_listener.cpp:292)

和MacOs提示问题报告:

Python quit unexpectedly while using the libzmq.5.dylib plug-in.

在我看来,问题是打开了太多上下文。但是如何通过多处理避免这种情况?

我在下面附上部分代码:

class Agent(Database, Logger, Trade, Messaging, multiprocessing.Process):
    def __init__(self, idn, group, _addresses, trade_logging):
        multiprocessing.Process.__init__(self)
        ....

    def run(self):
        self.context = zmq.Context()
        self.commands = self.context.socket(zmq.SUB)
        self.commands.connect(self._addresses['command_addresse'])
        self.commands.setsockopt(zmq.SUBSCRIBE, "all")
        self.commands.setsockopt(zmq.SUBSCRIBE, self.name)
        self.commands.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out = self.context.socket(zmq.PUSH)
        self.out.connect(self._addresses['frontend'])
        time.sleep(0.1)
        self.database_connection = self.context.socket(zmq.PUSH)
        self.database_connection.connect(self._addresses['database'])
        time.sleep(0.1)
        self.logger_connection = self.context.socket(zmq.PUSH)
        self.logger_connection.connect(self._addresses['logger'])

        self.messages_in = self.context.socket(zmq.DEALER)
        self.messages_in.setsockopt(zmq.IDENTITY, self.name)
        self.messages_in.connect(self._addresses['backend'])

        self.shout = self.context.socket(zmq.SUB)
        self.shout.connect(self._addresses['group_backend'])
        self.shout.setsockopt(zmq.SUBSCRIBE, "all")
        self.shout.setsockopt(zmq.SUBSCRIBE, self.name)
        self.shout.setsockopt(zmq.SUBSCRIBE, group_address(self.group))

        self.out.send_multipart(['!', '!', 'register_agent', self.name])

        while True:
            try:
                self.commands.recv()  # catches the group adress.
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s,self.commands.recv() to catch own adress ~1888' % (self.name))
                break
            command = self.commands.recv()
            if command == "!":
                subcommand = self.commands.recv()
                if subcommand == 'die':
                    self.__signal_finished()
                    break
            try:
                self._methods[command]()
            except KeyError:
                if command not in self._methods:
                    raise SystemExit('The method - ' + command + ' - called in the agent_list is not declared (' + self.name)
                else:
                    raise
            except KeyboardInterrupt:
                print('KeyboardInterrupt: %s, Current command: %s ~1984' % (self.name, command))
                break

            if command[0] != '_':
                self.__reject_polled_but_not_accepted_offers()
                self.__signal_finished()
        #self.context.destroy()

整个代码在http://www.github.com/DavoudTaghawiNejad/abce

很可能不是上下文太多,而是套接字太多。查看您的回购协议,我看到您(正确地)使用 IPC 作为您的交通工具; IPC 使用文件描述符作为 "address" 在不同进程之间来回传递数据。如果我没看错的话,每个进程最多打开 7 个套接字,所以加起来很快。我敢打赌,如果您在代码中间进行一些调试,您会发现在创建最后一个上下文时它不会失败,但是当最后一个套接字将打开的文件限制推到边缘时。

我的理解是,打开 FD 的典型用户限制在 1000 左右,因此在大约 100 个代理中,您正在为您的套接字推送 700 个打开 FD。其余的可能只是典型的。将您的限制增加到 10,000 应该没有问题,更高取决于您的情况。否则你将不得不重写每个进程使用更少的套接字以获得更高的进程限制。

这与 zeromq 和 python 无关。它是底层操作系统,只允许同时打开文件的特定阈值。此限制包括普通文件,但也包括套接字连接。

您可以使用 ulimit -n 查看您的当前限制,它可能默认为 1024。机器 运行 服务器或有其他原因(如多处理)通常需要将此限制设置得更高或只是 unlimited。 – More info about ulimit.

此外,还有一个 global limit,不过我还没有什么需要调整的。

一般来说,您应该问问自己,是否真的需要那么多代理。通常,X / 2X 个工作进程就足够了,其中 X 对应于您的 CPU 个数。

您应该像这个问题一样增加进程允许打开的文件数: Python: Which command increases the number of open files on Windows?

每个进程的默认值为 512

import win32file
print win32file._getmaxstdio() #512

win32file._setmaxstdio(1024)
print win32file._getmaxstdio() #1024