Pyzmq - 将消息发送到 STREAM 套接字
Pyzmq - send message to STREAM socket
我正在尝试在 pyzmq 中实现两个 STREAM 套接字之间连接的简单示例。
sender.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
socket.connect("tcp://localhost:5556")
socket.send("message")
receiver.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")
message = socket.recv()
print("Received -> [ %s ]" % (message))
输出
Received [ b'\x00k\x8bEg' ]
Received [ b'' ]
我想问一下在STREAM套接字之间发送消息的正确方法是什么。
你的 socket.recv()
-ed 数据与 ZeroMQ 规范完全匹配,尽管它们不一定让你开心并且你怀疑你为什么得到这个,而不是很好地传递了已发送消息的精确副本。
所以,请耐心阅读。
最近添加了 ZeroMQ STREAM
socket-archetype 相当具体
任何有几年 ZeroMQ signalling/messaging 工具经验的人都会告诉你,最近(v4.x)添加的 STREAM
原型不是 ZeroMQ 进程的最佳选择需要一个ZeroMQ进程互通。
为什么? ZeroMQ 工具拥有的几乎所有 gem 都是并且必须是 STREAM
中的快捷方式,以便允许 ZeroMQ 套接字访问点能够 "speak" 到相反的套接字端点进程,它什么都不知道关于 ZeroMQ 智能套接字高级协议。
Native pattern
The native pattern is used for communicating with TCP peers and allows asynchronous requests and replies in either direction.
ZMQ_STREAM
A socket of type ZMQ_STREAM
is used to send and receive TCP data from a non-ØMQ peer, when using the tcp://
transport. A ZMQ_STREAM
socket can act as client and/or server, sending and/or receiving TCP data asynchronously.
When receiving TCP data, a ZMQ_STREAM
socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers.
When sending TCP data, a ZMQ_STREAM
socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to, and unroutable messages shall cause an EHOSTUNREACH
or EAGAIN
error.
To open a connection to a server, use the zmq_connect()
call, and then fetch the socket identity using the ZMQ_IDENTITY
zmq_getsockopt()
call.
To close a specific connection, send the identity frame followed by a zero-length message (see EXAMPLE section).
When a connection is made, a zero-length message will be received by the application. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.
You must send one identity frame followed by one data frame. The ZMQ_SNDMORE
flag is required for identity frames but is ignored on data frames.
EXAMPLE
void *ctx = zmq_ctx_new ();
assert ( ctx );
/* Create ZMQ_STREAM socket */
void *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );
int rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );
/* Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t id_size = 256;
/* Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t raw_size = 256;
while ( 1 ) {
/* Get HTTP request; ID frame and then request */
id_size = zmq_recv ( socket, id, 256, 0 );
assert ( id_size > 0 );
do {
raw_size = zmq_recv ( socket, raw, 256, 0 );
assert ( raw_size >= 0 );
} while ( raw_size == 256 );
/* Prepares the response */
char http_response [] =
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
/* Sends the ID frame followed by the response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, http_response, strlen ( http_response ), 0 );
/* Closes the connection by sending the ID frame followed by a zero response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );
如果你按照STREAM
多连接套接字情况下的行为描述,发送方将恰好在[=上收到公平队列循环读取29=] 实例,它连接(1x 通过 .connect()
+ Nx 通过 .bind()
, N = < 0, +INF )
)到多个端点,到目前为止,对任一计数的控制为零or/and 通信对等体的性质,但在 socket.recv()
-s 上具有公平排队的循环机制。绝对不是安全的设计实践。
Summary of ZMQ_STREAM characteristics
Compatible peer sockets none
Direction Bidirectional
Send/receive pattern Unrestricted
Outgoing routing strategy See text ( above )
Incoming routing strategy Fair-queued
Action in mute state EAGAIN
这是问题中使用单向连接的 pyzmq 的简化示例。
sender.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect('tcp://localhost:5555')
id_sock = socket.getsockopt(zmq.IDENTITY)
socket.send(id_sock, zmq.SNDMORE)
socket.send(b'message')
receiver.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind('tcp://*:5555')
id_sock = socket.recv()
assert not socket.recv() # empty data here
assert socket.recv() == id_sock
message = socket.recv()
print('received:' + str(message))
我正在尝试在 pyzmq 中实现两个 STREAM 套接字之间连接的简单示例。
sender.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")
socket.connect("tcp://localhost:5556")
socket.send("message")
receiver.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")
message = socket.recv()
print("Received -> [ %s ]" % (message))
输出
Received [ b'\x00k\x8bEg' ]
Received [ b'' ]
我想问一下在STREAM套接字之间发送消息的正确方法是什么。
你的 socket.recv()
-ed 数据与 ZeroMQ 规范完全匹配,尽管它们不一定让你开心并且你怀疑你为什么得到这个,而不是很好地传递了已发送消息的精确副本。
所以,请耐心阅读。
最近添加了 ZeroMQ STREAM
socket-archetype 相当具体
任何有几年 ZeroMQ signalling/messaging 工具经验的人都会告诉你,最近(v4.x)添加的 STREAM
原型不是 ZeroMQ 进程的最佳选择需要一个ZeroMQ进程互通。
为什么? ZeroMQ 工具拥有的几乎所有 gem 都是并且必须是 STREAM
中的快捷方式,以便允许 ZeroMQ 套接字访问点能够 "speak" 到相反的套接字端点进程,它什么都不知道关于 ZeroMQ 智能套接字高级协议。
Native pattern
The native pattern is used for communicating with TCP peers and allows asynchronous requests and replies in either direction.
ZMQ_STREAM
A socket of type
ZMQ_STREAM
is used to send and receive TCP data from a non-ØMQ peer, when using thetcp://
transport. AZMQ_STREAM
socket can act as client and/or server, sending and/or receiving TCP data asynchronously.When receiving TCP data, a
ZMQ_STREAM
socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers.When sending TCP data, a
ZMQ_STREAM
socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to, and unroutable messages shall cause anEHOSTUNREACH
orEAGAIN
error.To open a connection to a server, use the
zmq_connect()
call, and then fetch the socket identity using theZMQ_IDENTITY
zmq_getsockopt()
call.To close a specific connection, send the identity frame followed by a zero-length message (see EXAMPLE section).
When a connection is made, a zero-length message will be received by the application. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.
You must send one identity frame followed by one data frame. The
ZMQ_SNDMORE
flag is required for identity frames but is ignored on data frames.EXAMPLE
void *ctx = zmq_ctx_new ();
assert ( ctx );
/* Create ZMQ_STREAM socket */
void *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );
int rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );
/* Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t id_size = 256;
/* Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t raw_size = 256;
while ( 1 ) {
/* Get HTTP request; ID frame and then request */
id_size = zmq_recv ( socket, id, 256, 0 );
assert ( id_size > 0 );
do {
raw_size = zmq_recv ( socket, raw, 256, 0 );
assert ( raw_size >= 0 );
} while ( raw_size == 256 );
/* Prepares the response */
char http_response [] =
"HTTP/1.0 200 OK\r\n"
"Content-Type: text/plain\r\n"
"\r\n"
"Hello, World!";
/* Sends the ID frame followed by the response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, http_response, strlen ( http_response ), 0 );
/* Closes the connection by sending the ID frame followed by a zero response */
zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );
如果你按照STREAM
多连接套接字情况下的行为描述,发送方将恰好在[=上收到公平队列循环读取29=] 实例,它连接(1x 通过 .connect()
+ Nx 通过 .bind()
, N = < 0, +INF )
)到多个端点,到目前为止,对任一计数的控制为零or/and 通信对等体的性质,但在 socket.recv()
-s 上具有公平排队的循环机制。绝对不是安全的设计实践。
Summary of ZMQ_STREAM characteristics
Compatible peer sockets none
Direction Bidirectional
Send/receive pattern Unrestricted
Outgoing routing strategy See text ( above )
Incoming routing strategy Fair-queued
Action in mute state EAGAIN
这是问题中使用单向连接的 pyzmq 的简化示例。
sender.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.connect('tcp://localhost:5555')
id_sock = socket.getsockopt(zmq.IDENTITY)
socket.send(id_sock, zmq.SNDMORE)
socket.send(b'message')
receiver.py
import zmq
context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind('tcp://*:5555')
id_sock = socket.recv()
assert not socket.recv() # empty data here
assert socket.recv() == id_sock
message = socket.recv()
print('received:' + str(message))