ZeroMQ:消失的消息
ZeroMQ: Disappearing messages
我们有一个充当服务器的 Java 应用程序。客户端应用程序(用 C# 编写)使用 ZeroMQ 与其通信。我们(主要)遵循懒惰海盗模式。
服务器有一个Router socket,实现如下(使用JeroMQ):
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");
客户端像这样连接并发送消息:
ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));
当多个客户端同时发送消息时,我们遇到了丢失消息的情况。单个客户端,似乎没有任何问题。
我们是否以正确的方式实现多客户端单服务器设置?
更新:表现出这种行为的示例客户端和服务器:
服务器:
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
public class SimpleServer
{
public static void main(String[] args) throws InterruptedException
{
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.setRouterMandatory(true);
socket.bind("tcp://*:5559");
PollItem pollItem = new PollItem(socket, Poller.POLLIN);
int messagesReceived = 0;
int pollCount = 0;
while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
{
messagesReceived += pollCount;
for (int i = 0 ; i < pollCount ; i++)
{
ZMsg msg = ZMsg.recvMsg(socket);
System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
}
if (pollCount == 0)
{
System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
}
}
}
}
客户:
using NetMQ;
using System;
using System.Text;
namespace SimpleClient
{
class Program
{
static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);
static void Main(string[] args)
{
for (int i = 0; i < 100; i++)
{
SendMessage();
}
}
private static void SendMessage()
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRequestSocket())
{
socket.Options.Identity = identity;
socket.Connect("tcp://localhost:5559");
socket.Send(Encoding.UTF8.GetBytes("hello!"));
}
}
}
}
}
如果我 运行 服务器和单个客户端,我可以看到我的 100 条消息全部到达。如果我 运行,比如说,同时有 5 个客户端,我只会收到大约 200 -> 300 条消息,而不是全部 500 条。顺便说一句,关闭客户端中的套接字似乎以某种方式停止了路由器套接字在服务器上短暂地接收消息,虽然这只是一个理论。
第 1 部分 - 投票可能 return 不止一个事件
ZMQ.poll()
returns 找到的事件数:
int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);
您目前假设 poll
中的一个 return 是一个事件。相反,您应该循环 ZMsg msg = ZMsg.recvMsg(socket);
以获取 ZMQ.Poll()
的 return 指示的事件数。
/**
* Polling on items. This has very poor performance.
* Try to use zmq_poll with selector
* CAUTION: This could be affected by jdk epoll bug
*
* @param items
* @param timeout
* @return number of events
*/
public static int zmq_poll(PollItem[] items, long timeout)
{
return zmq_poll(items, items.length, timeout);
}
第 2 部分 - ZMsg.receive() 可能 return 多帧
当您收到来自 ZMsg msg = ZMsg.recvMsg(socket);
的 ZMsg
时,ZMsg
可能包含多个 ZFrame
,每个包含客户端数据。
来自ZMsg
class in JeroMQ's source的评论:
* // Receive message from ZMQSocket "input" socket object and iterate over frames
* ZMsg receivedMessage = ZMsg.recvMsg(input);
* for (ZFrame f : receivedMessage) {
* // Do something with frame f (of type ZFrame)
* }
第 3 部分 - 消息可以拆分到多个 ZFrame
From ZFrame's source in JeroMQ:
* The ZFrame class provides methods to send and receive single message
* frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
* When you read a frame from a socket, the more() method indicates if the frame is part of an
* unfinished multipart message.
如果我的理解正确,那么对于每个事件,您可能会得到多个帧,并且一条客户端消息可能会映射到 1..N 帧(如果消息很大?)。
总结一下:
- 投票中的一个 return 可能表示多个事件。
- 一个事件和一个
ZMsg.receive()
可能包含多个帧
- 一帧可以包含一条完整的客户端消息,也可以只包含一部分客户端消息;一条客户端消息映射到 1..N 帧。
遗憾的是,我们无法解决这个特定问题,因此不再将 ZeroMQ 用于此接口。如果它对其他人有帮助,我们唯一确定的事情是请求套接字迅速 opening/closing 在路由器套接字端导致不良行为(丢失消息)。性能不佳的服务器 CPU 加剧了该问题,当服务器位于快速多核计算机上时根本不会出现该问题。
不幸的是,在这个问题出现时,我什至没有密切使用 ZMQ。但是我今天遇到了同样的问题并找到了这个页面。你的回答(不使用 ZMQ)对我来说并不令人满意。所以我搜索了更多,终于找到了该怎么做。
提醒一下:这适用于 ZMQ [1]
中的 "POLLER"
如果您使用 "PAIR" 连接,您肯定不会丢失任何文件,但是 send/recive 大约需要。同一时间。所以你不能加速,对我来说不是解决方案。
解法:
in zmq_setsockopt (python: zmq.setsockopt) 你可以设置 ZMQ_HWM (zmq.SNDHWM, zmq.RCVHWM) 到 '0' [2]
在 python 中:sock.setsockopt(zmq.SNDHWM , 0) sock.setsockopt(zmq.RCVHWM, 0) 对于发件人 resp。接收器
注意:我认为符号从 HWM 更改为 SNDWHM/RCVHWM
HWM = 0 表示消息数量有"NO limit"(所以要小心,可能会设置一个(非常高的)限制)
还有ZMQ_SNDBUF/ZMQ_RCVBUF(python:zmq.SNDBUF/zmq.RCVBUF) 您也可以给出,即。 sock.setsockopt(zmq.RCVBUF, 0) 对应。 ..... [2]
所以这会将操作系统 "SO_RCVBUF" 设置为默认值(我的知识到此为止)
是否设置此参数不会影响我的情况,但我认为它可能
性能:
因此,我可以 "send" 在 ~8s (~10GB) 内处理 100'000 个 98kB 的文件:这将填满你的 RAM(如果它已满,我认为你的程序会变慢),另请参阅图片
与此同时,我 "recived" 并在大约 ~enter image description here118 秒内保存了文件并再次释放 RAM
此外,到目前为止,我 NERVER 丢失了一个文件。 (如果您达到 PC 的极限,您可能会这样做)
数据丢失为"GOOD":
如果您确实需要所有数据,您应该使用此方法
如果你能认为有些损失是可以的(例如实时绘图:只要你的FPS>~50你就可以流畅地看到绘图并且你不在乎是否丢失了一些东西)
--> 您可以节省 RAM 并避免阻塞整个 PC!
希望这 post 对下一个路过的人有所帮助...
[1]: https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.htm
[2]: http://api.zeromq.org/2-1:zmq-setsockopt
你找到一张RAM她的照片:
RAM is loading in about 8s. Afterwords the disk is saving the files from the buffer
我们有一个充当服务器的 Java 应用程序。客户端应用程序(用 C# 编写)使用 ZeroMQ 与其通信。我们(主要)遵循懒惰海盗模式。
服务器有一个Router socket,实现如下(使用JeroMQ):
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.bind("tcp://*:5555");
客户端像这样连接并发送消息:
ZContext context = ZContext.Create();
ZSocket socket = ZSocket.Create(context, ZSocketType.REQ);
socket.Identity = Encoding.UTF8.GetBytes("Some identity");
socket.Connect("tcp://my_host:5555");
socket.Send(new ZFrame("request data"));
当多个客户端同时发送消息时,我们遇到了丢失消息的情况。单个客户端,似乎没有任何问题。
我们是否以正确的方式实现多客户端单服务器设置?
更新:表现出这种行为的示例客户端和服务器:
服务器:
import org.zeromq.ZContext;
import org.zeromq.ZMQ;
import org.zeromq.ZMQ.PollItem;
import org.zeromq.ZMQ.Poller;
import org.zeromq.ZMQ.Socket;
import org.zeromq.ZMsg;
public class SimpleServer
{
public static void main(String[] args) throws InterruptedException
{
ZContext context = new ZContext();
Socket socket = context.createSocket(ZMQ.ROUTER);
socket.setRouterMandatory(true);
socket.bind("tcp://*:5559");
PollItem pollItem = new PollItem(socket, Poller.POLLIN);
int messagesReceived = 0;
int pollCount = 0;
while ((pollCount = ZMQ.poll(new PollItem[]{pollItem}, 3000)) > -1)
{
messagesReceived += pollCount;
for (int i = 0 ; i < pollCount ; i++)
{
ZMsg msg = ZMsg.recvMsg(socket);
System.out.println(String.format("Received message: %s. Total messages received: %d", msg, messagesReceived));
}
if (pollCount == 0)
{
System.out.println(String.format("No messages on socket. Total messages received: %d", messagesReceived));
}
}
}
}
客户:
using NetMQ;
using System;
using System.Text;
namespace SimpleClient
{
class Program
{
static byte[] identity = Encoding.UTF8.GetBytes("id" + DateTime.UtcNow.Ticks);
static void Main(string[] args)
{
for (int i = 0; i < 100; i++)
{
SendMessage();
}
}
private static void SendMessage()
{
using (NetMQContext context = NetMQContext.Create())
{
using (NetMQSocket socket = context.CreateRequestSocket())
{
socket.Options.Identity = identity;
socket.Connect("tcp://localhost:5559");
socket.Send(Encoding.UTF8.GetBytes("hello!"));
}
}
}
}
}
如果我 运行 服务器和单个客户端,我可以看到我的 100 条消息全部到达。如果我 运行,比如说,同时有 5 个客户端,我只会收到大约 200 -> 300 条消息,而不是全部 500 条。顺便说一句,关闭客户端中的套接字似乎以某种方式停止了路由器套接字在服务器上短暂地接收消息,虽然这只是一个理论。
第 1 部分 - 投票可能 return 不止一个事件
ZMQ.poll()
returns 找到的事件数:
int rc = ZMQ.poll(new PollItem[]{pollItem}, 3000);
您目前假设 poll
中的一个 return 是一个事件。相反,您应该循环 ZMsg msg = ZMsg.recvMsg(socket);
以获取 ZMQ.Poll()
的 return 指示的事件数。
/**
* Polling on items. This has very poor performance.
* Try to use zmq_poll with selector
* CAUTION: This could be affected by jdk epoll bug
*
* @param items
* @param timeout
* @return number of events
*/
public static int zmq_poll(PollItem[] items, long timeout)
{
return zmq_poll(items, items.length, timeout);
}
第 2 部分 - ZMsg.receive() 可能 return 多帧
当您收到来自 ZMsg msg = ZMsg.recvMsg(socket);
的 ZMsg
时,ZMsg
可能包含多个 ZFrame
,每个包含客户端数据。
来自ZMsg
class in JeroMQ's source的评论:
* // Receive message from ZMQSocket "input" socket object and iterate over frames
* ZMsg receivedMessage = ZMsg.recvMsg(input);
* for (ZFrame f : receivedMessage) {
* // Do something with frame f (of type ZFrame)
* }
第 3 部分 - 消息可以拆分到多个 ZFrame
From ZFrame's source in JeroMQ:
* The ZFrame class provides methods to send and receive single message
* frames across 0MQ sockets. A 'frame' corresponds to one underlying zmq_msg_t in the libzmq code.
* When you read a frame from a socket, the more() method indicates if the frame is part of an
* unfinished multipart message.
如果我的理解正确,那么对于每个事件,您可能会得到多个帧,并且一条客户端消息可能会映射到 1..N 帧(如果消息很大?)。
总结一下:
- 投票中的一个 return 可能表示多个事件。
- 一个事件和一个
ZMsg.receive()
可能包含多个帧 - 一帧可以包含一条完整的客户端消息,也可以只包含一部分客户端消息;一条客户端消息映射到 1..N 帧。
遗憾的是,我们无法解决这个特定问题,因此不再将 ZeroMQ 用于此接口。如果它对其他人有帮助,我们唯一确定的事情是请求套接字迅速 opening/closing 在路由器套接字端导致不良行为(丢失消息)。性能不佳的服务器 CPU 加剧了该问题,当服务器位于快速多核计算机上时根本不会出现该问题。
不幸的是,在这个问题出现时,我什至没有密切使用 ZMQ。但是我今天遇到了同样的问题并找到了这个页面。你的回答(不使用 ZMQ)对我来说并不令人满意。所以我搜索了更多,终于找到了该怎么做。
提醒一下:这适用于 ZMQ [1]
中的 "POLLER"如果您使用 "PAIR" 连接,您肯定不会丢失任何文件,但是 send/recive 大约需要。同一时间。所以你不能加速,对我来说不是解决方案。
解法:
in zmq_setsockopt (python: zmq.setsockopt) 你可以设置 ZMQ_HWM (zmq.SNDHWM, zmq.RCVHWM) 到 '0' [2]
在 python 中:sock.setsockopt(zmq.SNDHWM , 0) sock.setsockopt(zmq.RCVHWM, 0) 对于发件人 resp。接收器
注意:我认为符号从 HWM 更改为 SNDWHM/RCVHWM
HWM = 0 表示消息数量有"NO limit"(所以要小心,可能会设置一个(非常高的)限制)
还有ZMQ_SNDBUF/ZMQ_RCVBUF(python:zmq.SNDBUF/zmq.RCVBUF) 您也可以给出,即。 sock.setsockopt(zmq.RCVBUF, 0) 对应。 ..... [2]
所以这会将操作系统 "SO_RCVBUF" 设置为默认值(我的知识到此为止)
是否设置此参数不会影响我的情况,但我认为它可能
性能:
因此,我可以 "send" 在 ~8s (~10GB) 内处理 100'000 个 98kB 的文件:这将填满你的 RAM(如果它已满,我认为你的程序会变慢),另请参阅图片
与此同时,我 "recived" 并在大约 ~enter image description here118 秒内保存了文件并再次释放 RAM
此外,到目前为止,我 NERVER 丢失了一个文件。 (如果您达到 PC 的极限,您可能会这样做)
数据丢失为"GOOD":
如果您确实需要所有数据,您应该使用此方法
如果你能认为有些损失是可以的(例如实时绘图:只要你的FPS>~50你就可以流畅地看到绘图并且你不在乎是否丢失了一些东西)
--> 您可以节省 RAM 并避免阻塞整个 PC!
希望这 post 对下一个路过的人有所帮助...
[1]: https://learning-0mq-with-pyzmq.readthedocs.io/en/latest/pyzmq/multisocket/zmqpoller.htm
[2]: http://api.zeromq.org/2-1:zmq-setsockopt
你找到一张RAM她的照片: RAM is loading in about 8s. Afterwords the disk is saving the files from the buffer