网络上的 NetMQ

NetMQ On A Network

我正在尝试在本地网络上不同计算机上的程序之间进行消息传递。该设计基本上是一个 "server," 多个 "clients,",服务器向客户端发送消息并从客户端接收消息,客户端从服务器接收消息,并且 sending/receiving 每个客户端之间的客户端消息其他.

我正尝试为此使用 NetMQ,但我无法让它在网络上运行。我可以 运行 服务器和客户端程序在一台机器上完美运行,但我不确定如何让它在网络上工作,而且我找不到任何 NetMQ 示例来说明这一点——到明确一点,我能找到的 每个 示例都使用 "localhost" 或 both "publisher" 和 "subscriber"(我使用的是 Pub-Sub 模式,但无论示例中使用何种模式,我都看到了相同的结果)。

一些使用客户端-客户端框架的示例代码(这需要一个 XPub-XSub 框架,因为有多个发布者和订阅者):

NetMQPoller poller = new NetMQPoller();
PublisherSocket clientPub = new PublisherSocket("tcp://*:4444");
XPublisherSocket xPub = new XPublisherSocket("tcp://*:5555");
XSubscriberSocket xSub = new XSubscriberSocket("tcp://*:4444");
Proxy proxy = new proxy(xSub, xPub, poller: poller);
proxy.Start();
SubscriberSocket clientSub = new SubscriberSocket("tcp://*:5555");
poller.Add(clientSub);
poller.Add(clientPub);
poller.RunAsync();

如果我 运行 这个,我得到 "the application is in break mode" 并出现以下错误:

NetMQ.NetMQException HResult=0x80131500 Source=NetMQ StackTrace: at NetMQ.Core.Transports.Tcp.TcpConnector.OutCompleted(SocketError socketError, Int32 bytesTransferred) at NetMQ.Core.IOObject.OutCompleted(SocketError socketError, Int32 bytesTransferred) at NetMQ.Core.Utils.Proactor.Loop() at System.Threading.ThreadHelper.ThreadStart_Context(Object state) at System.Threading.ExecutionContext.RunInternal(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean preserveSyncCtx) at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state) at System.Threading.ThreadHelper.ThreadStart()

如果我将 clientSub 的地址更改为 "tcp://127.0.0.1:5555",它会正常启动。但是,那是行不通的,因为订阅者需要从网络上的任何机器接收消息。

如何使用 NetMQ 执行此操作?任何人都可以提供一个工作示例吗?

默认情况下,当您使用套接字的构造函数时,是绑定或连接,具体取决于套接字类型。发布者绑定(监听)和订阅者连接。所以对订阅者使用 * 确实有意义。

您正在使用 tcp 传输,因此您必须在连接时指定远程对等方的 IP 地址。如果您想连接到多个发布者,您可以调用 connect 或提供 s 字符串,其中多个地址以逗号分隔。

下面一行不是 *,localhost 或 127.0.0.1 使用发布者机器的 ip 地址。

您可以使用 @ 或 > 前缀来覆盖构造函数的默认值。 @ 绑定和 > 连接。

NetMQPoller poller = new NetMQPoller();
PublisherSocket clientPub = new PublisherSocket(">tcp://192.168.0.15:4444");
XPublisherSocket xPub = new XPublisherSocket("@tcp://*:5555");
XSubscriberSocket xSub = new XSubscriberSocket("@tcp://*:4444");
Proxy proxy = new proxy(xSub, xPub, poller: poller);
proxy.Start();
SubscriberSocket clientSub = new SubscriberSocket(">tcp://192.168.0.15:5555");
poller.Add(clientSub);
poller.Add(clientPub);
poller.RunAsync();

192.168.0.15 是我的 IP 地址。你需要把它改成你的ip。

我有一个简单的例子。我的 "server" 代码在 Amazon EC2 实例上运行。它在端口 11000 上侦听并响应单个消息。

using System;
using NetMQ;
using NetMQ.Sockets;

class Program
{
    private static void Main()
    { 
        using (var server = new ResponseSocket("@tcp://*:11000"))
        {
            // the server receives, then sends
            Console.WriteLine("From Client: {0}", server.ReceiveFrameString());
            server.SendFrame("Hi Back");
            Console.ReadKey();
        }
    }
}

我的客户端代码在我的电脑上运行:

using System;
using NetMQ;
using NetMQ.Sockets;

class Program
{
    private static void Main()
    {
        using (var client = new RequestSocket(">tcp://3.13.186.250:11000"))
        {
            client.SendFrame("Hello");
            Console.WriteLine("From Server: {0}", client.ReceiveFrameString());
            Console.ReadKey();
        }
    }
}