使用 netmq 轮询器的性能影响
Performance impact of using netmq poller
在下面的代码中,我使用 netmq push/pull 套接字发送和接收 100000 条消息。我首先尝试在拉式套接字上使用 ReceiveFrameString(ReceiveSimple 方法)进行简单的阻塞调用,然后尝试使用轮询器进行相同的操作(ReceiveWithPoller 方法)。
使用轮询器对发送/接收消息所需的时间有重大影响。我试图自己弄清楚为什么使用 dotTrace,我发现很多时间都花在等待 Socket.Select 执行上。
有人可以确认或解释这种差异吗?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PushPull
{
class Program
{
static int messageReceived = 0;
const int sampleCount = 100 * 1000;
static void Main(string[] args)
{
// Create psu socket
PushSocket pushSocket = new PushSocket();
pushSocket.Bind("tcp://localhost:5555");
// Create pull socket
PullSocket pullSocket = new PullSocket();
pullSocket.Connect("tcp://localhost:5555");
Console.WriteLine("Ready...press any key to start");
Console.ReadKey();
Console.WriteLine();
// Start sending
Task.Run(() =>
{
for (int i = 0; i < sampleCount; i++)
{
pushSocket.SendFrame(Encoding.UTF8.GetBytes("ping"));
}
});
Stopwatch sw = Stopwatch.StartNew();
//ReceiveSimple(pullSocket);
ReceiveWithPoller(pullSocket);
// Display result.
sw.Stop();
Console.WriteLine();
Console.WriteLine("{0} message exchanged in {1} msecs", sampleCount, sw.Elapsed.TotalMilliseconds);
}
private static void ReceiveSimple(PullSocket pullSocket)
{
messageReceived = 0;
do
{
pullSocket.ReceiveFrameString();
} while (!HandleMessage());
}
private static void ReceiveWithPoller(PullSocket pullSocket)
{
NetMQPoller poller = new NetMQPoller();
poller.Add(pullSocket);
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
if (HandleMessage())
{
poller.Stop();
}
};
poller.Run();
}
private static bool HandleMessage()
{
messageReceived++;
if (messageReceived % 10000 == 0)
{
Console.WriteLine("10k");
}
return messageReceived == sampleCount;
}
}
}
当您处理大量消息时,轮询器的开销很大,但有一个非常简单的解决方案。
当您收到就绪事件时获取队列中的所有消息,您可以使用 Try* 方法执行此操作,在您的示例中如下所示:
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
string message;
while (pullSocket.TryReceiveFrameString(out message)
{
if (HandleMessage())
{
poller.Stop();
}
}
};
在下面的代码中,我使用 netmq push/pull 套接字发送和接收 100000 条消息。我首先尝试在拉式套接字上使用 ReceiveFrameString(ReceiveSimple 方法)进行简单的阻塞调用,然后尝试使用轮询器进行相同的操作(ReceiveWithPoller 方法)。
使用轮询器对发送/接收消息所需的时间有重大影响。我试图自己弄清楚为什么使用 dotTrace,我发现很多时间都花在等待 Socket.Select 执行上。
有人可以确认或解释这种差异吗?
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using NetMQ;
using NetMQ.Sockets;
namespace PushPull
{
class Program
{
static int messageReceived = 0;
const int sampleCount = 100 * 1000;
static void Main(string[] args)
{
// Create psu socket
PushSocket pushSocket = new PushSocket();
pushSocket.Bind("tcp://localhost:5555");
// Create pull socket
PullSocket pullSocket = new PullSocket();
pullSocket.Connect("tcp://localhost:5555");
Console.WriteLine("Ready...press any key to start");
Console.ReadKey();
Console.WriteLine();
// Start sending
Task.Run(() =>
{
for (int i = 0; i < sampleCount; i++)
{
pushSocket.SendFrame(Encoding.UTF8.GetBytes("ping"));
}
});
Stopwatch sw = Stopwatch.StartNew();
//ReceiveSimple(pullSocket);
ReceiveWithPoller(pullSocket);
// Display result.
sw.Stop();
Console.WriteLine();
Console.WriteLine("{0} message exchanged in {1} msecs", sampleCount, sw.Elapsed.TotalMilliseconds);
}
private static void ReceiveSimple(PullSocket pullSocket)
{
messageReceived = 0;
do
{
pullSocket.ReceiveFrameString();
} while (!HandleMessage());
}
private static void ReceiveWithPoller(PullSocket pullSocket)
{
NetMQPoller poller = new NetMQPoller();
poller.Add(pullSocket);
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
if (HandleMessage())
{
poller.Stop();
}
};
poller.Run();
}
private static bool HandleMessage()
{
messageReceived++;
if (messageReceived % 10000 == 0)
{
Console.WriteLine("10k");
}
return messageReceived == sampleCount;
}
}
}
当您处理大量消息时,轮询器的开销很大,但有一个非常简单的解决方案。
当您收到就绪事件时获取队列中的所有消息,您可以使用 Try* 方法执行此操作,在您的示例中如下所示:
pullSocket.ReceiveReady += (sender, eventArgs) =>
{
string message;
while (pullSocket.TryReceiveFrameString(out message)
{
if (HandleMessage())
{
poller.Stop();
}
}
};