如何在 ZeroMQ 或 NetMQ 中从路由器套接字发送和接收?
How can I both Send and Receive from a Router socket in ZeroMQ or NetMQ?
我在 NetMQ v4 中设置了 Dealer <--> 路由器,我可以在其中毫无问题地向任何方向异步发送和接收消息。
我现在想将其形式化为一个抽象,其中服务器(路由器)侦听任何传入消息,但它还需要按需向任何连接的客户端广播消息(经销商)。
我试图避免使用 Pub <--> Sub 套接字,因为我需要订阅者也向服务器发送消息。与我想要实现的最接近的模式是 WebSocket 客户端-服务器通信。
侦听客户端消息的第一部分是这样完成的:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
while (true)
{
var msg = server.ReceiveMultipartMessage();
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
既然套接字不是线程安全的,我一直在寻找一种方法来向所有客户端广播消息(来自不同线程的请求)。
我可能可以在循环中使用 TryReceiveMultipartMessage
方法而不是设置超时,之后我可以检查队列中是否有任何广播消息,然后循环遍历每个发送此类消息的客户端。类似于:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
var msg = new NetMQMessage();
while (true)
{
var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
if (!clientHasMsg)
{
// Check any incoming broacast then loop through all the clients
// sending each the brodcast msg
var broadMsg = new NetMQMessage();
foreach (var item in addresses)
{
broadMsg.Append(item);
broadMsg.AppendEmptyFrame();
broadMsg.Append("This is a broadcast");
server.SendMultipartMessage(broadMsg);
broadMsg.Clear();
}
// Go back into the loop waiting for client messages
continue;
}
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
这在某种程度上感觉不对,主要是由于:
- 什么超时值比较合适? 1 秒,100 毫秒等;
- 这是最efficient/performing 的解决方案吗,因为该程序将用于连接 100k+ 客户端,每个客户端每秒发送数千条消息。
非常感谢任何关于最佳方法的指示。
我认为 PUB/SUB 是满足 100k+ 客户要求的正确方法。然而,这并不意味着您不能与服务器通信:使用 DEALER/ROUTER。为什么您认为此解决方案不可接受?
您可以使用 netmqqueue,它是多生产者单消费者队列。您可以将它添加到 NetMQPoller 并从多个线程中入队而无需锁定。
我在 NetMQ v4 中设置了 Dealer <--> 路由器,我可以在其中毫无问题地向任何方向异步发送和接收消息。
我现在想将其形式化为一个抽象,其中服务器(路由器)侦听任何传入消息,但它还需要按需向任何连接的客户端广播消息(经销商)。
我试图避免使用 Pub <--> Sub 套接字,因为我需要订阅者也向服务器发送消息。与我想要实现的最接近的模式是 WebSocket 客户端-服务器通信。
侦听客户端消息的第一部分是这样完成的:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
while (true)
{
var msg = server.ReceiveMultipartMessage();
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
既然套接字不是线程安全的,我一直在寻找一种方法来向所有客户端广播消息(来自不同线程的请求)。
我可能可以在循环中使用 TryReceiveMultipartMessage
方法而不是设置超时,之后我可以检查队列中是否有任何广播消息,然后循环遍历每个发送此类消息的客户端。类似于:
using (var server = new RouterSocket("@tcp://*:80"))
{
var addresses = new HashSet<string>();
var msg = new NetMQMessage();
while (true)
{
var clientHasMsg = server.TryReceiveMultipartMessage(TimeSpan.FromSeconds(1), ref msg);
if (!clientHasMsg)
{
// Check any incoming broacast then loop through all the clients
// sending each the brodcast msg
var broadMsg = new NetMQMessage();
foreach (var item in addresses)
{
broadMsg.Append(item);
broadMsg.AppendEmptyFrame();
broadMsg.Append("This is a broadcast");
server.SendMultipartMessage(broadMsg);
broadMsg.Clear();
}
// Go back into the loop waiting for client messages
continue;
}
var address = Encoding.UTF8.GetString(msg[0].Buffer);
var payload = Encoding.UTF8.GetString(msg[2].Buffer);
Console.WriteLine("[Server] - Client: {0} Says: {1}", address, payload);
var contains = addresses.Contains(address);
if (!contains) { addresses.Add(address); }
msg.Clear();
msg.Append(address);
msg.AppendEmptyFrame();
msg.Append("Reply for: " + address);
server.SendMultipartMessage(msg);
}
}
这在某种程度上感觉不对,主要是由于:
- 什么超时值比较合适? 1 秒,100 毫秒等;
- 这是最efficient/performing 的解决方案吗,因为该程序将用于连接 100k+ 客户端,每个客户端每秒发送数千条消息。
非常感谢任何关于最佳方法的指示。
我认为 PUB/SUB 是满足 100k+ 客户要求的正确方法。然而,这并不意味着您不能与服务器通信:使用 DEALER/ROUTER。为什么您认为此解决方案不可接受?
您可以使用 netmqqueue,它是多生产者单消费者队列。您可以将它添加到 NetMQPoller 并从多个线程中入队而无需锁定。