在 python 中使用 pyzmq 的日志处理程序
Use of pyzmq's logging handler in python
我想在 Python 程序中引入一个基于 zmq
的登录。当我面临 ZMQError: Address in use
个错误时,我决定将其归结为一个简单的概念证明。我能够 运行 精简版,但没有收到任何日志条目。这是我使用的代码:
日志发布者:
import time
import logging
from zmq.log import handlers as zmqHandler
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
logger.error('error test...')
print "Send error #%s" % (str(i))
time.sleep(1)
结果
Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...
日志订阅者:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
# Generate 30 entries
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
结果
Listening to publishers...
所以用户被锁定在 socket.recv()
的电话上。我在不同的控制台中启动了发布者和订阅者。当我使用 netstat:
时,这两个进程都出现了
C:\>netstat -a -n -o | findstr 12344
TCP 127.0.0.1:12344 0.0.0.0:0 LISTEN 1336
TCP 127.0.0.1:12344 127.0.0.1:51937 ESTABLISHED 1336
TCP 127.0.0.1:51937 127.0.0.1:12344 ESTABLISHED 8624
我没有看到我的错误,有什么想法吗?
除了手头的问题,这个zmq listener一般怎么用。
我是否必须为每个进程创建一个 PUBHandler
实例,然后将其添加到 logger
的所有实例(logging.getLogger('myapp')
创建一个自己的记录器实例,对吗?)还是我必须创建我使用的所有不同 classes 都有自己的 PUBHandler
吗?由于 PUBHandler
class 有一个 createLock()
我假设它不是线程保存...
为了完整起见,我想提一下 doc of the PUBHandler class
我在 Win7 上使用 python(x,y) 分布 python 2.7.10 和 pyzmq 14.7.0-14
[更新]
我排除了 windows 防火墙作为丢失包的来源
我猜你错过了在服务器中设置 PUB 套接字。
应该这样做,
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:12344')
zmqh = PUBHandler(publisher)
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
logger.addHandler(zmqh)
希望对您有所帮助。
问题出在订阅方。最初,订阅者会过滤掉所有消息,直到设置了过滤器。使用 socket.setsockopt(opt, value)
函数将其归档。
pyZMQ
描述的不是很清楚这个函数的使用:
getsockopt(opt) get default socket options for new sockets created by
this Context
但是zmq_setsockopt
函数的文档写的很清楚(see here):
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)
...
ZMQ_SUBSCRIBE
: Establish message filter The ZMQ_SUBSCRIBE
option shall establish a new message filter on a ZMQ_SUB
socket.
Newly created ZMQ_SUB
sockets shall filter out all incoming
messages, therefore you should call this option to establish an
initial message filter.
所以解决方案是用socket.setsockopt(zmq.SUBSCRIBE,filter)
设置一个过滤器,其中过滤器是你要过滤的字符串。使用 filter=''
显示所有消息。像 filter='ERROR'
这样的过滤器将只显示错误消息并抑制所有其他类型,如 WARNING
、INFO
或 DEBUG
。
sub_client()
函数如下所示:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,'')
# Process 30 updates
print "Listening to publishers..."
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
我知道它比较旧 post,但如果有人登陆这里,这就是订阅者的样子
def sub_client():
port = "12345"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.subscribe("")
# Process 30 updates
for i in range (30):
print("Listening to publishers...")
message = socket.recv()
print("Received error #%s: %s",str(i), message)
time.sleep(1)
sub_client()
Publisher 看起来像
import zmq
import logging
import time
from zmq.log.handlers import PUBHandler
port = "12345"
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:%s" % port)
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
for i in range(50):
logger.error('error test...')
print("publish error",str(i))
time.sleep(1)
我想在 Python 程序中引入一个基于 zmq
的登录。当我面临 ZMQError: Address in use
个错误时,我决定将其归结为一个简单的概念证明。我能够 运行 精简版,但没有收到任何日志条目。这是我使用的代码:
日志发布者:
import time
import logging
from zmq.log import handlers as zmqHandler
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
zmqH=zmqHandler.PUBHandler('tcp://127.0.0.1:12344')
logger.addHandler(zmqH)
for i in range(50):
logger.error('error test...')
print "Send error #%s" % (str(i))
time.sleep(1)
结果
Send error #0
Send error #1
Send error #2
Send error #3
Send error #4
...
日志订阅者:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
# Generate 30 entries
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
结果
Listening to publishers...
所以用户被锁定在 socket.recv()
的电话上。我在不同的控制台中启动了发布者和订阅者。当我使用 netstat:
C:\>netstat -a -n -o | findstr 12344
TCP 127.0.0.1:12344 0.0.0.0:0 LISTEN 1336
TCP 127.0.0.1:12344 127.0.0.1:51937 ESTABLISHED 1336
TCP 127.0.0.1:51937 127.0.0.1:12344 ESTABLISHED 8624
我没有看到我的错误,有什么想法吗?
除了手头的问题,这个zmq listener一般怎么用。
我是否必须为每个进程创建一个 PUBHandler
实例,然后将其添加到 logger
的所有实例(logging.getLogger('myapp')
创建一个自己的记录器实例,对吗?)还是我必须创建我使用的所有不同 classes 都有自己的 PUBHandler
吗?由于 PUBHandler
class 有一个 createLock()
我假设它不是线程保存...
为了完整起见,我想提一下 doc of the PUBHandler class
我在 Win7 上使用 python(x,y) 分布 python 2.7.10 和 pyzmq 14.7.0-14
[更新] 我排除了 windows 防火墙作为丢失包的来源
我猜你错过了在服务器中设置 PUB 套接字。
应该这样做,
publisher = context.socket(zmq.PUB)
publisher.bind('tcp://127.0.0.1:12344')
zmqh = PUBHandler(publisher)
logger = logging.getLogger('myapp')
logger.setLevel(logging.ERROR)
logger.addHandler(zmqh)
希望对您有所帮助。
问题出在订阅方。最初,订阅者会过滤掉所有消息,直到设置了过滤器。使用 socket.setsockopt(opt, value)
函数将其归档。
pyZMQ
描述的不是很清楚这个函数的使用:
getsockopt(opt) get default socket options for new sockets created by this Context
但是zmq_setsockopt
函数的文档写的很清楚(see here):
int zmq_setsockopt (void *socket, int option_name, const void *option_value, size_t option_len)
...
ZMQ_SUBSCRIBE
: Establish message filter TheZMQ_SUBSCRIBE
option shall establish a new message filter on aZMQ_SUB
socket. Newly createdZMQ_SUB
sockets shall filter out all incoming messages, therefore you should call this option to establish an initial message filter.
所以解决方案是用socket.setsockopt(zmq.SUBSCRIBE,filter)
设置一个过滤器,其中过滤器是你要过滤的字符串。使用 filter=''
显示所有消息。像 filter='ERROR'
这样的过滤器将只显示错误消息并抑制所有其他类型,如 WARNING
、INFO
或 DEBUG
。
sub_client()
函数如下所示:
import time
import zmq
def sub_client():
port = "12344"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://127.0.0.1:%s" % port)
socket.setsockopt(zmq.SUBSCRIBE,'')
# Process 30 updates
print "Listening to publishers..."
for i in range (30):
print "Listening to publishers..."
message = socket.recv()
print "Received error #%s: %s" % (str(i), message)
time.sleep(1)
sub_client()
我知道它比较旧 post,但如果有人登陆这里,这就是订阅者的样子
def sub_client():
port = "12345"
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:%s" % port)
socket.subscribe("")
# Process 30 updates
for i in range (30):
print("Listening to publishers...")
message = socket.recv()
print("Received error #%s: %s",str(i), message)
time.sleep(1)
sub_client()
Publisher 看起来像
import zmq
import logging
import time
from zmq.log.handlers import PUBHandler
port = "12345"
context = zmq.Context()
pub = context.socket(zmq.PUB)
pub.bind("tcp://*:%s" % port)
handler = PUBHandler(pub)
logger = logging.getLogger()
logger.setLevel(logging.ERROR)
logger.addHandler(handler)
for i in range(50):
logger.error('error test...')
print("publish error",str(i))
time.sleep(1)