NetMQ 客户端到客户端消息传递
NetMQ client to client messaging
我正在尝试创建一个 rpc 程序来与位于不同网络的主机进行通信,并选择了此处提供的 NetMQ 的 Router-Dealer 配置:http://netmq.readthedocs.io/en/latest/router-dealer/#router-dealer
但问题是路由器在将消息路由到后端时总是选择随机经销商。
我使用的代码:
using (var frontend = new RouterSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5556")))//"@tcp://10.0.2.218:5559"
using (var backend = new DealerSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5557")))//"@tcp://10.0.2.218:5560"
{
// Handler for messages coming in to the frontend
frontend.ReceiveReady += (s, e) =>
{
Console.WriteLine("message arrived on frontEnd");
NetMQMessage msg = e.Socket.ReceiveMultipartMessage();
string clientAddress = msg[0].ConvertToString();
Console.WriteLine("Sending to :" + clientAddress);
//TODO: Make routing here
backend.SendMultipartMessage(msg); // Relay this message to the backend };
// Handler for messages coming in to the backend
backend.ReceiveReady += (s, e) =>
{
Console.WriteLine("message arrived on backend");
var msg = e.Socket.ReceiveMultipartMessage();
frontend.SendMultipartMessage(msg); // Relay this message to the frontend
};
using (var poller = new NetMQPoller { backend, frontend })
{
// Listen out for events on both sockets and raise events when messages come in
poller.Run();
}
}
客户代码:
using (var client = new RequestSocket(">tcp://" + "127.0.0.1" + ":5556"))
{
var messageBytes = UTF8Encoding.UTF8.GetBytes("Hello");
var messageToServer = new NetMQMessage();
//messageToServer.AppendEmptyFrame();
messageToServer.Append("Server2");
messageToServer.Append(messageBytes);
WriteToConsoleVoid("======================================");
WriteToConsoleVoid(" OUTGOING MESSAGE TO SERVER ");
WriteToConsoleVoid("======================================");
//PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
NetMQMessage serverMessage = client.ReceiveMultipartMessage();
WriteToConsoleVoid("======================================");
WriteToConsoleVoid(" INCOMING MESSAGE FROM SERVER");
WriteToConsoleVoid("======================================");
//PrintFrames("Server receiving", clientMessage);
byte[] rpcByteArray = null;
if (serverMessage.FrameCount == 3)
{
var clientAddress = serverMessage[0];
rpcByteArray = serverMessage[2].ToByteArray();
}
WriteToConsoleVoid("======================================");
Console.ReadLine();
}
经销商代码:
using (var server = new ResponseSocket())
{
server.Options.Identity = UTF8Encoding.UTF8.GetBytes(confItem.ResponseServerID);
Console.WriteLine("Server ID:" + confItem.ResponseServerID);
server.Connect(string.Format("tcp://{0}:{1}", "127.0.0.1", "5557"));
using (var poller = new NetMQPoller { server })
{
server.ReceiveReady += (s, a) =>
{
byte[] response = null;
NetMQMessage serverMessage = null;
try
{
serverMessage = a.Socket.ReceiveMultipartMessage();
}
catch (Exception ex)
{
Console.WriteLine("Exception on ReceiveMultipartMessage : " + ex.ToString());
//continue;
}
byte[] eaBody = null;
string clientAddress = "";
if (serverMessage.FrameCount == 2)
{
clientAddress = serverMessage[0].ConvertToString();
Console.WriteLine("ClientAddress:" + clientAddress);
eaBody = serverMessage[1].ToByteArray();
Console.WriteLine("Received message from remote computer: {0} bytes , CurrentID : {1}", eaBody.Length, confItem.ResponseServerID);
}
else
{
Console.WriteLine("Received message from remote computer: CurrentID : {0}", confItem.ResponseServerID);
}
};
poller.Run();
}
}
是否可以在 frontend.ReceiveReady 上选择特定的后端?
谢谢!
您的后端也应该是路由器。您需要工人进行注册,或者您需要知道所有可用的工人及其身份。在后端发送时,将工作人员身份推送到服务器的开头。
看看 zeromq 指南中的 Majordomo 示例:
http://zguide.zeromq.org/page:all#toc72
http://zguide.zeromq.org/page:all#toc98
我正在尝试创建一个 rpc 程序来与位于不同网络的主机进行通信,并选择了此处提供的 NetMQ 的 Router-Dealer 配置:http://netmq.readthedocs.io/en/latest/router-dealer/#router-dealer
但问题是路由器在将消息路由到后端时总是选择随机经销商。 我使用的代码:
using (var frontend = new RouterSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5556")))//"@tcp://10.0.2.218:5559"
using (var backend = new DealerSocket(string.Format("@tcp://{0}:{1}", "127.0.0.1", "5557")))//"@tcp://10.0.2.218:5560"
{
// Handler for messages coming in to the frontend
frontend.ReceiveReady += (s, e) =>
{
Console.WriteLine("message arrived on frontEnd");
NetMQMessage msg = e.Socket.ReceiveMultipartMessage();
string clientAddress = msg[0].ConvertToString();
Console.WriteLine("Sending to :" + clientAddress);
//TODO: Make routing here
backend.SendMultipartMessage(msg); // Relay this message to the backend };
// Handler for messages coming in to the backend
backend.ReceiveReady += (s, e) =>
{
Console.WriteLine("message arrived on backend");
var msg = e.Socket.ReceiveMultipartMessage();
frontend.SendMultipartMessage(msg); // Relay this message to the frontend
};
using (var poller = new NetMQPoller { backend, frontend })
{
// Listen out for events on both sockets and raise events when messages come in
poller.Run();
}
}
客户代码:
using (var client = new RequestSocket(">tcp://" + "127.0.0.1" + ":5556"))
{
var messageBytes = UTF8Encoding.UTF8.GetBytes("Hello");
var messageToServer = new NetMQMessage();
//messageToServer.AppendEmptyFrame();
messageToServer.Append("Server2");
messageToServer.Append(messageBytes);
WriteToConsoleVoid("======================================");
WriteToConsoleVoid(" OUTGOING MESSAGE TO SERVER ");
WriteToConsoleVoid("======================================");
//PrintFrames("Client Sending", messageToServer);
client.SendMultipartMessage(messageToServer);
NetMQMessage serverMessage = client.ReceiveMultipartMessage();
WriteToConsoleVoid("======================================");
WriteToConsoleVoid(" INCOMING MESSAGE FROM SERVER");
WriteToConsoleVoid("======================================");
//PrintFrames("Server receiving", clientMessage);
byte[] rpcByteArray = null;
if (serverMessage.FrameCount == 3)
{
var clientAddress = serverMessage[0];
rpcByteArray = serverMessage[2].ToByteArray();
}
WriteToConsoleVoid("======================================");
Console.ReadLine();
}
经销商代码:
using (var server = new ResponseSocket())
{
server.Options.Identity = UTF8Encoding.UTF8.GetBytes(confItem.ResponseServerID);
Console.WriteLine("Server ID:" + confItem.ResponseServerID);
server.Connect(string.Format("tcp://{0}:{1}", "127.0.0.1", "5557"));
using (var poller = new NetMQPoller { server })
{
server.ReceiveReady += (s, a) =>
{
byte[] response = null;
NetMQMessage serverMessage = null;
try
{
serverMessage = a.Socket.ReceiveMultipartMessage();
}
catch (Exception ex)
{
Console.WriteLine("Exception on ReceiveMultipartMessage : " + ex.ToString());
//continue;
}
byte[] eaBody = null;
string clientAddress = "";
if (serverMessage.FrameCount == 2)
{
clientAddress = serverMessage[0].ConvertToString();
Console.WriteLine("ClientAddress:" + clientAddress);
eaBody = serverMessage[1].ToByteArray();
Console.WriteLine("Received message from remote computer: {0} bytes , CurrentID : {1}", eaBody.Length, confItem.ResponseServerID);
}
else
{
Console.WriteLine("Received message from remote computer: CurrentID : {0}", confItem.ResponseServerID);
}
};
poller.Run();
}
}
是否可以在 frontend.ReceiveReady 上选择特定的后端? 谢谢!
您的后端也应该是路由器。您需要工人进行注册,或者您需要知道所有可用的工人及其身份。在后端发送时,将工作人员身份推送到服务器的开头。
看看 zeromq 指南中的 Majordomo 示例:
http://zguide.zeromq.org/page:all#toc72 http://zguide.zeromq.org/page:all#toc98