NetMQ Receive/Response 循环时不工作
NetMQ Receive/Response not working when looping
我在 https://netmq.readthedocs.io/ 上使用了一个简单的 receive/request 套接字示例,并想让它在无限循环中与 parametrizedThread 一起工作。
该代码在几个循环中运行良好,之后它抛出
A non-blocking socket operation could not be completed immediately
对于我得到的结果,应该在第一个循环之后立即发生,而不是随机发生。这里的问题是什么?听起来好像必须清除某些东西才能再次获得干净的连接(不确定)。
class Program
{
public class Connector
{
public String connection { get; set; }
public ResponseSocket server { get; set; }
public Connector(string address, ResponseSocket server_)
{
this.connection = address;
this.server = server_;
}
}
static void Main(string[] args)
{
string connection = "tcp://localhost:5555";
using (var server = new ResponseSocket())
{
while (true)
{
try
{
server.Bind(connection);
}
catch (NetMQException e)
{
Console.WriteLine(e.ErrorCode);
}
Connector c = new Connector(connection, server);
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(c);
//runClientSide(connection, server);
}
}
}
private static void runClientSide(object param)
{
Connector conn = (Connector)param;
string connection = conn.connection;
ResponseSocket server = conn.server;
using (var client = new RequestSocket())
{
client.Connect(connection);
client.SendFrame("Hello");
string fromClientMessage = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.SendFrame("Hi Back");
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
//Console.ReadLine();
}
}
NetMQSockets 不是线程安全的,您正在从客户端线程内部访问服务器以获取 send/receive 数据。客户端无论如何都不应该访问服务器套接字。
首先将 Bind 移动到 while 循环之外,它只需要一次,而不是每个创建的客户端都需要。
要等待消息,请使用 NetMQPoller,它将为您处理所有其他事情,并会在收到消息后引发服务器 ReceiveReady 事件。
static void Main(string[] args) {
string connection = "tcp://localhost:5555";
using (var poller = new NetMQPoller()) {
using (var server = new ResponseSocket()) {
server.ReceiveReady += Server_ReceiveReady;
poller.Add(server);
poller.RunAsync();
server.Bind(connection);
// start 10000 clients
for(int i = 0; i < 10000; i++) {
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(connection);
}
Console.ReadLine(); //let server run until user pressed Enter key
}
}
}
//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
string fromClientMessage = e.Socket.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
e.Socket.SendFrame("Hi Back");
}
private static void runClientSide(object param) {
string connection = (string) param;
using (var client = new RequestSocket()) {
client.Connect(connection);
client.SendFrame("Hello");
//Removed server side code here and put it into ReceiveReady event
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
}
}
我在 https://netmq.readthedocs.io/ 上使用了一个简单的 receive/request 套接字示例,并想让它在无限循环中与 parametrizedThread 一起工作。 该代码在几个循环中运行良好,之后它抛出
A non-blocking socket operation could not be completed immediately
对于我得到的结果,应该在第一个循环之后立即发生,而不是随机发生。这里的问题是什么?听起来好像必须清除某些东西才能再次获得干净的连接(不确定)。
class Program
{
public class Connector
{
public String connection { get; set; }
public ResponseSocket server { get; set; }
public Connector(string address, ResponseSocket server_)
{
this.connection = address;
this.server = server_;
}
}
static void Main(string[] args)
{
string connection = "tcp://localhost:5555";
using (var server = new ResponseSocket())
{
while (true)
{
try
{
server.Bind(connection);
}
catch (NetMQException e)
{
Console.WriteLine(e.ErrorCode);
}
Connector c = new Connector(connection, server);
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(c);
//runClientSide(connection, server);
}
}
}
private static void runClientSide(object param)
{
Connector conn = (Connector)param;
string connection = conn.connection;
ResponseSocket server = conn.server;
using (var client = new RequestSocket())
{
client.Connect(connection);
client.SendFrame("Hello");
string fromClientMessage = server.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
server.SendFrame("Hi Back");
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
//Console.ReadLine();
}
}
NetMQSockets 不是线程安全的,您正在从客户端线程内部访问服务器以获取 send/receive 数据。客户端无论如何都不应该访问服务器套接字。
首先将 Bind 移动到 while 循环之外,它只需要一次,而不是每个创建的客户端都需要。 要等待消息,请使用 NetMQPoller,它将为您处理所有其他事情,并会在收到消息后引发服务器 ReceiveReady 事件。
static void Main(string[] args) {
string connection = "tcp://localhost:5555";
using (var poller = new NetMQPoller()) {
using (var server = new ResponseSocket()) {
server.ReceiveReady += Server_ReceiveReady;
poller.Add(server);
poller.RunAsync();
server.Bind(connection);
// start 10000 clients
for(int i = 0; i < 10000; i++) {
ParameterizedThreadStart parametrizedClientThread = new ParameterizedThreadStart(runClientSide);
Thread t = new Thread(parametrizedClientThread);
t.Start(connection);
}
Console.ReadLine(); //let server run until user pressed Enter key
}
}
}
//server (e.Socket) is receiving data here and can answer it
private static void Server_ReceiveReady(object sender, NetMQSocketEventArgs e) {
string fromClientMessage = e.Socket.ReceiveFrameString();
Console.WriteLine("From Client: {0}", fromClientMessage);
e.Socket.SendFrame("Hi Back");
}
private static void runClientSide(object param) {
string connection = (string) param;
using (var client = new RequestSocket()) {
client.Connect(connection);
client.SendFrame("Hello");
//Removed server side code here and put it into ReceiveReady event
string fromServerMessage = client.ReceiveFrameString();
Console.WriteLine("From Server: {0}", fromServerMessage);
}
}