zmq PUBhandler 无法在 __init__() 方法中使用

zmq PUBhandler could not be used in __init__() method

我有一个 class MyLogger 用于使用 PUBhandler.

向日志服务器发送消息

LogWorker.init() 方法中实例化 MyLogger 时引发异常(如版本 1),但是,如果 MyLoggerLogWorker.log_worker() 方法中实例化则可以(版本 2)。

如有任何建议,我们将不胜感激。

import logging
from multiprocessing import Process
import os
import random
import sys
import time

import zmq
from zmq.log.handlers import PUBHandler


class MyLogger(object):
    ''''''

    def __init__(self, port, handler=None):
        self.port = port
        self.handler = handler or self._construct_sock_handler()
        self.logger = logging.getLogger()
        self.logger.setLevel(logging.INFO)
        if not self.logger.handlers:
            self.logger.addHandler(self.handler)


    def _construct_sock_handler(self):
        context = zmq.Context()
        log_sock = context.socket(zmq.PUB)
        log_sock.connect("tcp://127.0.0.1:%i" % self.port)
        time.sleep(0.1)
        handler = PUBHandler(log_sock)
        return handler


    def get_logger(self):
        return self.logger


def sub_logger(port, level=logging.DEBUG):
    ctx = zmq.Context()
    sub = ctx.socket(zmq.SUB)
    sub.bind('tcp://127.0.0.1:%i' % port)
    sub.setsockopt(zmq.SUBSCRIBE, "")
    logging.basicConfig(level=level)

    while True:
        level, message = sub.recv_multipart()
        if message.endswith('\n'):
            # trim trailing newline, which will get appended again
            message = message[:-1]
        log = getattr(logging, level.lower())
        log(message)


class LogWorker(object):

    def __init__(self):
        - pass   # version 1
        + self.logger = MyLogger(port).get_logger()   # version 2

    def log_worker(self, port):
        - self.logger = MyLogger(port).get_logger() # version 1
        print "starting logger at %i with level=%s" % (os.getpid(), logging.DEBUG)

        while True:
            level = logging.INFO
            self.logger.log(level, "Hello from %i!" % os.getpid())
            time.sleep(1)

if __name__ == '__main__':
    if len(sys.argv) > 1:
        n = int(sys.argv[1])
    else:
        n = 2

    port = 5555

    workers = [Process(target=LogWorker().log_worker, args=(port,)) for _ in range(n)]
    [w.start() for w in workers]

    try:
        sub_logger(port)
    except KeyboardInterrupt:
        pass
    finally:
        [ w.terminate() for w in workers ]

来自 pyzmq 所有者 minrk 的回答:

您不能跨叉边界传递 zmq 上下文或套接字,这种情况在您使用多处理实例化子进程时发生。您必须确保在进入子流程后创建 Context

解决方案:

def work():
    worker = LogWorker(port)
    worker.log_worker()

workers = [ Process(target=work) for _ in range(n) ]