在 python 中使用 pyzmq 的日志处理程序

Use of pyzmq's logging handler in python

我想在 Python 程序中引入一个基于 zmq 的登录。当我面临 ZMQError: Address in use 个错误时,我决定将其归结为一个简单的概念证明。我能够 运行 精简版,但没有收到任何日志条目。这是我使用的代码:

日志发布者:

import time
import logging
from zmq.log import handlers as zmqHandler


logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
    logger.error('error test...')
    print "Send error #%s" % (str(i))
    time.sleep(1)

结果

Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...

日志订阅者:

import time
import zmq

def sub_client():
    port = "12344"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://127.0.0.1:%s" % port)
    # Generate 30 entries
    for i in range (30):
        print "Listening to publishers..."
        message = socket.recv()
        print "Received error #%s: %s" % (str(i), message)
        time.sleep(1)

sub_client()

结果

Listening to publishers...

所以用户被锁定在 socket.recv() 的电话上。我在不同的控制台中启动了发布者和订阅者。当我使用 netstat:

时,这两个进程都出现了
C:\>netstat -a -n -o | findstr 12344
  TCP    127.0.0.1:12344        0.0.0.0:0              LISTEN          1336
  TCP    127.0.0.1:12344        127.0.0.1:51937        ESTABLISHED     1336
  TCP    127.0.0.1:51937        127.0.0.1:12344        ESTABLISHED     8624

我没有看到我的错误,有什么想法吗?

除了手头的问题,这个zmq listener一般怎么用。 我是否必须为每个进程创建一个 PUBHandler 实例,然后将其添加到 logger 的所有实例(logging.getLogger('myapp') 创建一个自己的记录器实例,对吗?)还是我必须创建我使用的所有不同 classes 都有自己的 PUBHandler 吗?由于 PUBHandlerclass 有一个 createLock() 我假设它不是线程保存...

为了完整起见,我想提一下 doc of the PUBHandler class

我在 Win7 上使用 python(x,y) 分布 python 2.7.10 和 pyzmq 14.7.0-14

[更新] 我排除了 windows 防火墙作为丢失包的来源

我猜你错过了在服务器中设置 PUB 套接字。

应该这样做,

publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:12344')
zmqh = PUBHandler(publisher)
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
logger.addHandler(zmqh)

希望对您有所帮助。

问题出在订阅方。最初,订阅者会过滤掉所有消息,直到设置了过滤器。使用 socket.setsockopt(opt, value) 函数将其归档。 pyZMQ描述的不是很清楚这个函数的使用:

getsockopt(opt) get default socket options for new sockets created by this Context

但是zmq_setsockopt函数的文档写的很清楚(see here):

int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)

...

ZMQ_SUBSCRIBE: Establish message filter The ZMQ_SUBSCRIBE option shall establish a new message filter on a ZMQ_SUB socket. Newly created ZMQ_SUB sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.

所以解决方案是用socket.setsockopt(zmq.SUBSCRIBE,filter)设置一个过滤器,其中过滤器是你要过滤的字符串。使用 filter='' 显示所有消息。像 filter='ERROR' 这样的过滤器将只显示错误消息并抑制所有其他类型,如 WARNINGINFODEBUG

sub_client() 函数如下所示:

import time
import zmq

def sub_client():
    port = "12344"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://127.0.0.1:%s" % port)
    socket.setsockopt(zmq.SUBSCRIBE,'')


    # Process 30 updates
    print "Listening to publishers..."
    for i in range (30):       
        print "Listening to publishers..."
        message = socket.recv()
        print "Received error #%s: %s" % (str(i), message)
        time.sleep(1)

sub_client()

我知道它比较旧 post,但如果有人登陆这里,这就是订阅者的样子

def sub_client():
    port = "12345"       
    context = zmq.Context()
    socket = context.socket(zmq.SUB)    
    socket.connect("tcp://localhost:%s" % port)
    socket.subscribe("")
    
    # Process 30 updates
    for i in range (30):
        print("Listening to publishers...")
        message = socket.recv()
        print("Received error #%s: %s",str(i), message)
        time.sleep(1)

    sub_client()

Publisher 看起来像

import zmq
import logging
import time
from zmq.log.handlers import PUBHandler

port = "12345"       
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:%s" % port)

handler = PUBHandler(pub)
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
for i in range(50):
    logger.error('error test...')
    print("publish error",str(i))
    time.sleep(1)