如何在 ZMQ 代理中记录接收消息?
How to Log Receive Message in ZMQ Proxy?
在ZMQ Proxy中,我们有两种类型的socket,DEALER和ROUTER。另外,我试过使用捕获套接字,但根据我所寻找的内容,它没有用。
我正在寻找一种方法来记录我的代理服务器收到的消息。
Q : a way to log what message my proxy server receives.
最简单的方法是通过 ManInTheMiddle-"capture" 套接字使用 API v4+ 直接支持的日志记录:
// [ROUTER]--------------------------------------+++++++
// |||||||
// [DEALER]---------------*vvvvvvvv *vvvvvvv
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
// [?]---------------------------------------------------------------*^^^^^^^
其中 capture
应该是 { ZMQ_PUB | ZMQ_DEALER | ZMQ_PUSH | ZMQ_PAIR }
If the capture
socket is not NULL
, the proxy shall send all messages, received on both frontend
and backend
, to the capture
socket.
如果这个 ZeroMQ API-granted 没有满足您的期望,请根据需要以足够详细的方式表达您的期望(并实施 "external" capture
- socket payload { message-content | socket_monitor() } 基于过滤,或者可以设计一个全新的、用户定义的日志记录代理,您表达的功能将通过使用您的自定义用例特定来实现要求,在您的应用程序特定代码中实现,求助于重新使用干净而简单的 ZeroMQ API 用于所有 DEALER
-inbound/outbound-ROUTER
消息传递和log-filtering/processing逻辑。)
我想不出其他方法来完成任务。
它也适用于一对 PAIR 插座。一旦一对套接字的一端连接到捕获套接字,消息就会发送到捕获套接字并发送到代理的另一端。
http://zguide.zeromq.org/page:all#ZeroMQ-s-Built-In-Proxy-Function
和
http://api.zeromq.org/3-2:zmq-proxy
和
http://zguide.zeromq.org/page:all#Pub-Sub-Tracing-Espresso-Pattern
帮助了我。
python 中的这段代码演示了它:
import zmq, threading, time
def peer_run(ctx):
""" this is the run method of the PAIR thread that logs the messages
going through the broker """
sock = ctx.socket(zmq.PAIR)
sock.connect("inproc://peer") # connect to the caller
sock.send(b"") # signal the caller that we are ready
while True:
try:
topic = sock.recv_string()
obj = sock.recv_pyobj()
except Exception:
topic = None
obj = sock.recv()
print(f"\n !!! peer_run captured message with topic {topic}, obj {obj}. !!!\n")
def proxyrun():
""" zmq broker run method in separate thread because zmq.proxy blocks """
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
zmq.proxy(xpub, xsub, cap)
def pubrun():
""" publisher run method in a separate thread, publishes 5 messages with topic 'Hello'"""
socket = ctx.socket(zmq.PUB)
socket.connect(xsub_url)
for i in range(5):
socket.send_string(f"Hello {i}", zmq.SNDMORE)
socket.send_pyobj({'a' : 123})
time.sleep(0.01)
ctx = zmq.Context()
xpub_url = "ipc://xpub"
xsub_url = "ipc://xsub"
#xpub_url = "tcp://127.0.0.1:5567"
#xsub_url = "tcp://127.0.0.1:5568"
# set up the capture socket pair
cap = ctx.socket(zmq.PAIR)
cap.bind("inproc://peer")
cap_th = threading.Thread(target=peer_run, args=(ctx,), daemon=True)
cap_th.start()
cap.recv() # wait for signal from peer thread
print("cap received message from peer, proceeding.")
# start the proxy
th_proxy=threading.Thread(target=proxyrun, daemon=True)
th_proxy.start()
# create req/rep socket just to prove that pub/sub can run alongside it
zmq_rep_sock = ctx.socket(zmq.REP)
zmq_rep_sock.bind("ipc://ghi")
# create sub socket and connect it to proxy's pub socket
zmq_sub_sock = ctx.socket(zmq.SUB)
zmq_sub_sock.connect(xpub_url)
zmq_sub_sock.setsockopt(zmq.SUBSCRIBE, b"Hello")
# create the poller
poller = zmq.Poller()
poller.register(zmq_rep_sock, zmq.POLLIN)
poller.register(zmq_sub_sock, zmq.POLLIN)
# create publisher thread and start it
th_pub = threading.Thread(target=pubrun, daemon=True)
th_pub.start()
# receive publisher's messages ordinarily
while True:
events = dict(poller.poll())
print(f"received events: {events}")
if zmq_rep_sock in events:
message = zmq_rep_sock.recv_pyobj()
print(f"received zmq_rep_sock {message}")
elif zmq_sub_sock in events:
topic = zmq_sub_sock.recv_string()
message = zmq_sub_sock.recv_pyobj()
print(f"received zmq_sub_sock {topic} , {message}")
产出
cap received message from peer, proceeding.
!!! peer_run captured message with topic None, obj b'\x80\x03}q\x00X\x01\x00\x00\x00aq\x01K{s.'. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 1 , {'a': 123}
!!! peer_run captured message with topic Hello 2, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 2 , {'a': 123}
!!! peer_run captured message with topic Hello 3, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 3 , {'a': 123}
!!! peer_run captured message with topic Hello 4, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 4 , {'a': 123}
注意加入速度慢的问题,因此发布者中的睡眠命令/
在ZMQ Proxy中,我们有两种类型的socket,DEALER和ROUTER。另外,我试过使用捕获套接字,但根据我所寻找的内容,它没有用。
我正在寻找一种方法来记录我的代理服务器收到的消息。
Q : a way to log what message my proxy server receives.
最简单的方法是通过 ManInTheMiddle-"capture" 套接字使用 API v4+ 直接支持的日志记录:
// [ROUTER]--------------------------------------+++++++
// |||||||
// [DEALER]---------------*vvvvvvvv *vvvvvvv
int zmq_proxy (const void *frontend, const void *backend, const void *capture);
// [?]---------------------------------------------------------------*^^^^^^^
其中 capture
应该是 { ZMQ_PUB | ZMQ_DEALER | ZMQ_PUSH | ZMQ_PAIR }
If the
capture
socket is notNULL
, the proxy shall send all messages, received on bothfrontend
andbackend
, to thecapture
socket.
如果这个 ZeroMQ API-granted 没有满足您的期望,请根据需要以足够详细的方式表达您的期望(并实施 "external" capture
- socket payload { message-content | socket_monitor() } 基于过滤,或者可以设计一个全新的、用户定义的日志记录代理,您表达的功能将通过使用您的自定义用例特定来实现要求,在您的应用程序特定代码中实现,求助于重新使用干净而简单的 ZeroMQ API 用于所有 DEALER
-inbound/outbound-ROUTER
消息传递和log-filtering/processing逻辑。)
我想不出其他方法来完成任务。
它也适用于一对 PAIR 插座。一旦一对套接字的一端连接到捕获套接字,消息就会发送到捕获套接字并发送到代理的另一端。
http://zguide.zeromq.org/page:all#ZeroMQ-s-Built-In-Proxy-Function 和 http://api.zeromq.org/3-2:zmq-proxy 和 http://zguide.zeromq.org/page:all#Pub-Sub-Tracing-Espresso-Pattern
帮助了我。
python 中的这段代码演示了它:
import zmq, threading, time
def peer_run(ctx):
""" this is the run method of the PAIR thread that logs the messages
going through the broker """
sock = ctx.socket(zmq.PAIR)
sock.connect("inproc://peer") # connect to the caller
sock.send(b"") # signal the caller that we are ready
while True:
try:
topic = sock.recv_string()
obj = sock.recv_pyobj()
except Exception:
topic = None
obj = sock.recv()
print(f"\n !!! peer_run captured message with topic {topic}, obj {obj}. !!!\n")
def proxyrun():
""" zmq broker run method in separate thread because zmq.proxy blocks """
xpub = ctx.socket(zmq.XPUB)
xpub.bind(xpub_url)
xsub = ctx.socket(zmq.XSUB)
xsub.bind(xsub_url)
zmq.proxy(xpub, xsub, cap)
def pubrun():
""" publisher run method in a separate thread, publishes 5 messages with topic 'Hello'"""
socket = ctx.socket(zmq.PUB)
socket.connect(xsub_url)
for i in range(5):
socket.send_string(f"Hello {i}", zmq.SNDMORE)
socket.send_pyobj({'a' : 123})
time.sleep(0.01)
ctx = zmq.Context()
xpub_url = "ipc://xpub"
xsub_url = "ipc://xsub"
#xpub_url = "tcp://127.0.0.1:5567"
#xsub_url = "tcp://127.0.0.1:5568"
# set up the capture socket pair
cap = ctx.socket(zmq.PAIR)
cap.bind("inproc://peer")
cap_th = threading.Thread(target=peer_run, args=(ctx,), daemon=True)
cap_th.start()
cap.recv() # wait for signal from peer thread
print("cap received message from peer, proceeding.")
# start the proxy
th_proxy=threading.Thread(target=proxyrun, daemon=True)
th_proxy.start()
# create req/rep socket just to prove that pub/sub can run alongside it
zmq_rep_sock = ctx.socket(zmq.REP)
zmq_rep_sock.bind("ipc://ghi")
# create sub socket and connect it to proxy's pub socket
zmq_sub_sock = ctx.socket(zmq.SUB)
zmq_sub_sock.connect(xpub_url)
zmq_sub_sock.setsockopt(zmq.SUBSCRIBE, b"Hello")
# create the poller
poller = zmq.Poller()
poller.register(zmq_rep_sock, zmq.POLLIN)
poller.register(zmq_sub_sock, zmq.POLLIN)
# create publisher thread and start it
th_pub = threading.Thread(target=pubrun, daemon=True)
th_pub.start()
# receive publisher's messages ordinarily
while True:
events = dict(poller.poll())
print(f"received events: {events}")
if zmq_rep_sock in events:
message = zmq_rep_sock.recv_pyobj()
print(f"received zmq_rep_sock {message}")
elif zmq_sub_sock in events:
topic = zmq_sub_sock.recv_string()
message = zmq_sub_sock.recv_pyobj()
print(f"received zmq_sub_sock {topic} , {message}")
产出
cap received message from peer, proceeding.
!!! peer_run captured message with topic None, obj b'\x80\x03}q\x00X\x01\x00\x00\x00aq\x01K{s.'. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 1 , {'a': 123}
!!! peer_run captured message with topic Hello 2, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 2 , {'a': 123}
!!! peer_run captured message with topic Hello 3, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 3 , {'a': 123}
!!! peer_run captured message with topic Hello 4, obj {'a': 123}. !!!
received events: {<zmq.sugar.socket.Socket object at 0x76310f70>: 1}
received zmq_sub_sock Hello 4 , {'a': 123}
注意加入速度慢的问题,因此发布者中的睡眠命令/