无法使 NetMQ 发布-订阅模式与 ReceiveReady 一起使用
Can't get NetMQ pub-sub pattern to work with ReceiveReady
我正在尝试 NetMQ (3.3.3.4) 并创建发布-订阅模式。
我想要一个 host/server 监听一个端口 (9000) 上的所有传入数据,并将数据转发到另一个端口 (9001) 上的所有 clients/subscribers。
客户端随后将在 9000 上发送数据并接收在 9001 上(由任何人)发送的所有消息。
根据文档,我创建了类似下面代码的内容,但我无法让它工作。我相信,主要是因为 ReceiveReady
从未被调用过!
我认为它应该如何运作:
client.Publish
应该使 host.SubscriberSocket_ReceiveReady
中的第一行解除阻塞并将数据传递到另一个套接字
- 当数据被传递时,它应该出现在客户端运行
Task
的无限
结果:
- 从未达到
// This line is never reached
上的断点
- 无一例外。
- 切换主机上的端口,使 publish = 9000 和 subscribe = 9001 无效
- Windows防火墙已关闭,应该不会有任何阻塞
- 如果我将地址放入
PublisherSocket
构造函数,或者如果我在 Host 中使用 _publisherSocket.Bind(address)
或在 Client 中使用 _publisherSocket.Connect(address)
都没有区别
我做错了什么?
主机
public class MyNetMQHost {
private NetMQSocket _publishSocket;
private NetMQSocket _subscribeSocket;
private NetMQPoller _poller;
public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
Task.Factory.StartNew(() => {
using (_publishSocket = new PublisherSocket(publishAddress))
using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
_subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
_poller.Run();
}
});
}
private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
_publishSocket.SendMultipartBytes(data);
}
}
客户端
public class MyNetMQClient {
private readonly NetMQSocket _publishSocket;
private readonly NetMQSocket _subscribeSocket;
public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
_publishSocket = new PublisherSocket(publishAddress);
_subscribeSocket = new SubscriberSocket(subscribeAddress);
Task.Factory.StartNew(() => {
while (true) {
byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
int one = 1; // This line is never reached
}
});
}
public void Publish(byte[] data) {
_publishSocket.SendFrame(data);
}
}
测试人员
public class Tester {
public void MyTester() {
MyNetMQHost host = new MyNetMQHost();
MyNetMQClient client = new MyNetMQClient();
client.Publish(Encoding.Unicode.GetBytes("Hello world!");
}
}
您的经纪人和客户都不会调用 suscribe。
在代理上调用 suscriber.Subscribe("") 订阅所有。在您的客户上订阅您想要的任何内容。
在您的经纪人中,您实际上应该使用 XSubscriber 和 XPublisher 来移动 susvriptions。这样你就不需要全部订阅。您可以为此使用 Proxy class。
我正在尝试 NetMQ (3.3.3.4) 并创建发布-订阅模式。
我想要一个 host/server 监听一个端口 (9000) 上的所有传入数据,并将数据转发到另一个端口 (9001) 上的所有 clients/subscribers。
客户端随后将在 9000 上发送数据并接收在 9001 上(由任何人)发送的所有消息。
根据文档,我创建了类似下面代码的内容,但我无法让它工作。我相信,主要是因为 ReceiveReady
从未被调用过!
我认为它应该如何运作:
client.Publish
应该使host.SubscriberSocket_ReceiveReady
中的第一行解除阻塞并将数据传递到另一个套接字- 当数据被传递时,它应该出现在客户端运行
Task
的无限
结果:
- 从未达到
// This line is never reached
上的断点 - 无一例外。
- 切换主机上的端口,使 publish = 9000 和 subscribe = 9001 无效
- Windows防火墙已关闭,应该不会有任何阻塞
- 如果我将地址放入
PublisherSocket
构造函数,或者如果我在 Host 中使用_publisherSocket.Bind(address)
或在 Client 中使用
_publisherSocket.Connect(address)
都没有区别
我做错了什么?
主机
public class MyNetMQHost {
private NetMQSocket _publishSocket;
private NetMQSocket _subscribeSocket;
private NetMQPoller _poller;
public MyNetMQHost(string publishAddress = "@tcp://localhost:9001", string subscribeAddress = "@tcp://localhost:9000") {
Task.Factory.StartNew(() => {
using (_publishSocket = new PublisherSocket(publishAddress))
using (_subscribeSocket = new SubscriberSocket(subscribeAddress))
using (_poller = new NetMQPoller { _publishSocket, _subscribeSocket }) {
_subscriberSocket.ReceiveReady += SubscriberSocket_ReceiveReady;
_poller.Run();
}
});
}
private void SubscriberSocket_ReceiveReady(object sender, NetMQSocketEventArgs e) {
var data = e.Socket.ReceiveMultipartBytes(); // This line is never reached
_publishSocket.SendMultipartBytes(data);
}
}
客户端
public class MyNetMQClient {
private readonly NetMQSocket _publishSocket;
private readonly NetMQSocket _subscribeSocket;
public MyNetMQClient(string publishAddress = ">tcp://localhost:9000", string subscribeAddress = ">tcp://localhost:9001") {
_publishSocket = new PublisherSocket(publishAddress);
_subscribeSocket = new SubscriberSocket(subscribeAddress);
Task.Factory.StartNew(() => {
while (true) {
byte[] frameBytes = _subscribeSocket.ReceiveFrameBytes();
int one = 1; // This line is never reached
}
});
}
public void Publish(byte[] data) {
_publishSocket.SendFrame(data);
}
}
测试人员
public class Tester {
public void MyTester() {
MyNetMQHost host = new MyNetMQHost();
MyNetMQClient client = new MyNetMQClient();
client.Publish(Encoding.Unicode.GetBytes("Hello world!");
}
}
您的经纪人和客户都不会调用 suscribe。 在代理上调用 suscriber.Subscribe("") 订阅所有。在您的客户上订阅您想要的任何内容。
在您的经纪人中,您实际上应该使用 XSubscriber 和 XPublisher 来移动 susvriptions。这样你就不需要全部订阅。您可以为此使用 Proxy class。