Pyzmq - 将消息发送到 STREAM 套接字

Pyzmq - send message to STREAM socket

我正在尝试在 pyzmq 中实现两个 STREAM 套接字之间连接的简单示例。

sender.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5555")

socket.connect("tcp://localhost:5556")
socket.send("message")

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)
socket.bind("tcp://*:5556")

message = socket.recv()
print("Received -> [ %s ]" % (message))

输出

Received [ b'\x00k\x8bEg' ]
Received [ b'' ]

我想问一下在STREAM套接字之间发送消息的正确方法是什么。

你的 socket.recv()-ed 数据与 ZeroMQ 规范完全匹配,尽管它们不一定让你开心并且你怀疑你为什么得到这个,而不是很好地传递了已发送消息的精确副本。

所以,请耐心阅读。

最近添加了 ZeroMQ STREAM socket-archetype 相当具体

任何有几年 ZeroMQ signalling/messaging 工具经验的人都会告诉你,最近(v4.x)添加的 STREAM 原型不是 ZeroMQ 进程的最佳选择需要一个ZeroMQ进程互通。

为什么? ZeroMQ 工具拥有的几乎所有 gem 都是并且必须是 STREAM 中的快捷方式,以便允许 ZeroMQ 套接字访问点能够 "speak" 到相反的套接字端点进程,它什么都不知道关于 ZeroMQ 智能套接字高级协议。

Native pattern

The native pattern is used for communicating with TCP peers and allows asynchronous requests and replies in either direction. ZMQ_STREAM

A socket of type ZMQ_STREAM is used to send and receive TCP data from a non-ØMQ peer, when using the tcp:// transport. A ZMQ_STREAM socket can act as client and/or server, sending and/or receiving TCP data asynchronously.

When receiving TCP data, a ZMQ_STREAM socket shall prepend a message part containing the identity of the originating peer to the message before passing it to the application. Messages received are fair-queued from among all connected peers.

When sending TCP data, a ZMQ_STREAM socket shall remove the first part of the message and use it to determine the identity of the peer the message shall be routed to, and unroutable messages shall cause an EHOSTUNREACH or EAGAIN error.

To open a connection to a server, use the zmq_connect() call, and then fetch the socket identity using the ZMQ_IDENTITY zmq_getsockopt() call.

To close a specific connection, send the identity frame followed by a zero-length message (see EXAMPLE section).

When a connection is made, a zero-length message will be received by the application. Similarly, when the peer disconnects (or the connection is lost), a zero-length message will be received by the application.

You must send one identity frame followed by one data frame. The ZMQ_SNDMORE flag is required for identity frames but is ignored on data frames.

EXAMPLE

void    *ctx = zmq_ctx_new ();
assert ( ctx );
/*                                             Create ZMQ_STREAM socket */
void    *socket = zmq_socket ( ctx, ZMQ_STREAM );
assert ( socket );

int      rc = zmq_bind ( socket, "tcp://*:8080" );
assert ( rc == 0 );

/*                                            Data structure to hold the ZMQ_STREAM ID */
uint8_t id [256];
size_t  id_size = 256;

/*                                            Data structure to hold the ZMQ_STREAM received data */
uint8_t raw [256];
size_t  raw_size = 256;

while ( 1 ) {
   /*                                         Get HTTP request; ID frame and then request */
   id_size  = zmq_recv ( socket, id, 256, 0 );
   assert ( id_size >  0 );
   do {
        raw_size  = zmq_recv ( socket, raw, 256, 0 );
        assert ( raw_size >= 0 );
   } while (     raw_size == 256 );
   /*                                         Prepares the response */
   char http_response [] =
                            "HTTP/1.0 200 OK\r\n"
                            "Content-Type: text/plain\r\n"
                            "\r\n"
                            "Hello, World!";
   /*                                         Sends the ID frame followed by the response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, http_response, strlen ( http_response ), 0 );

   /*                                         Closes the connection by sending the ID frame followed by a zero response */
   zmq_send ( socket, id, id_size, ZMQ_SNDMORE );
   zmq_send ( socket, 0, 0, 0 );
}
zmq_close ( socket );
zmq_ctx_destroy ( ctx );

如果你按照STREAM多连接套接字情况下的行为描述,发送方将恰好在[=上收到公平队列循环读取29=] 实例,它连接(1x 通过 .connect() + Nx 通过 .bind(), N = < 0, +INF ) )到多个端点,到目前为止,对任一计数的控制为零or/and 通信对等体的性质,但在 socket.recv()-s 上具有公平排队的循环机制。绝对不是安全的设计实践。

Summary of ZMQ_STREAM characteristics
Compatible peer sockets     none
Direction                   Bidirectional
Send/receive pattern        Unrestricted
Outgoing routing strategy   See text ( above )
Incoming routing strategy   Fair-queued
Action in mute state        EAGAIN

这是问题中使用单向连接的 pyzmq 的简化示例。

sender.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.connect('tcp://localhost:5555')
id_sock = socket.getsockopt(zmq.IDENTITY)
socket.send(id_sock, zmq.SNDMORE)
socket.send(b'message')

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.STREAM)

socket.bind('tcp://*:5555')
id_sock = socket.recv()
assert not socket.recv()    # empty data here
assert socket.recv() == id_sock
message = socket.recv()
print('received:' + str(message))