NetMQ 如何使用 HighWatermark 选项检测慢速订阅者并由发布者断开连接?
NetMQ How to detect slow subscribers using HighWatermark option and disconnect them by publisher?
我只是在订阅者读取下一帧之前进行了延迟,所以我预计这将模拟慢速订阅者并提供 HighWatermark 选项的一些效果。
我没有观察到任何东西,订阅者不会跳过(丢弃)任何消息,也不会减慢发布者的速度。
我有 运行 1 个发布者和 x 个订阅者。
我尝试使用从文档中获取的 pub-sub 示例
https://netmq.readthedocs.io/en/latest/pub-sub/
有什么方法可以检测到订户速度慢吗?我的意思是要接收的排队消息数超过了 HighWatermark 值。我应该期待 NetMqMonitor 中出现任何异常或事件吗?
我也在寻找是否有断开这种慢用户的选项。
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
namespace Sub
{
class Program
{
static void Main()
{
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 100;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe("topic1");
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine($"{messageTopicReceived} {messageReceived}");
Thread.Sleep(50);
}
}
}
}
}
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
namespace Pub
{
class Program
{
static void Main()
{
using (var pubSocket = new PublisherSocket())
{
Console.WriteLine("Publisher socket binding...");
pubSocket.Options.SendHighWatermark = 100;
pubSocket.Bind("tcp://*:12345");
Thread.Sleep(1000);
for (var i = 0; i < 100000; i++)
{
var msg = "msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("topic1").SendFrame(msg);
//Thread.Sleep(1);
}
}
}
}
}
NetMqMonitor
不会触发事件来指示某些邮件已被删除。
订阅者必须验证生成时间并自行决定要做什么。 http://zguide.zeromq.org/php:chapter5#Slow-Subscriber-Detection-Suicidal-Snail-Pattern
Ps。要观察 ReceiveHighWatermark 的效果,必须修改问题中的示例代码。消息太小了。
我只是在订阅者读取下一帧之前进行了延迟,所以我预计这将模拟慢速订阅者并提供 HighWatermark 选项的一些效果。 我没有观察到任何东西,订阅者不会跳过(丢弃)任何消息,也不会减慢发布者的速度。 我有 运行 1 个发布者和 x 个订阅者。
我尝试使用从文档中获取的 pub-sub 示例 https://netmq.readthedocs.io/en/latest/pub-sub/
有什么方法可以检测到订户速度慢吗?我的意思是要接收的排队消息数超过了 HighWatermark 值。我应该期待 NetMqMonitor 中出现任何异常或事件吗? 我也在寻找是否有断开这种慢用户的选项。
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
namespace Sub
{
class Program
{
static void Main()
{
Console.WriteLine("Subscriber started for Topic : {0}", topic);
using (var subSocket = new SubscriberSocket())
{
subSocket.Options.ReceiveHighWatermark = 100;
subSocket.Connect("tcp://localhost:12345");
subSocket.Subscribe("topic1");
Console.WriteLine("Subscriber socket connecting...");
while (true)
{
string messageTopicReceived = subSocket.ReceiveFrameString();
string messageReceived = subSocket.ReceiveFrameString();
Console.WriteLine($"{messageTopicReceived} {messageReceived}");
Thread.Sleep(50);
}
}
}
}
}
using System;
using System.Threading;
using NetMQ;
using NetMQ.Sockets;
namespace Pub
{
class Program
{
static void Main()
{
using (var pubSocket = new PublisherSocket())
{
Console.WriteLine("Publisher socket binding...");
pubSocket.Options.SendHighWatermark = 100;
pubSocket.Bind("tcp://*:12345");
Thread.Sleep(1000);
for (var i = 0; i < 100000; i++)
{
var msg = "msg-" + i;
Console.WriteLine("Sending message : {0}", msg);
pubSocket.SendMoreFrame("topic1").SendFrame(msg);
//Thread.Sleep(1);
}
}
}
}
}
NetMqMonitor
不会触发事件来指示某些邮件已被删除。
订阅者必须验证生成时间并自行决定要做什么。 http://zguide.zeromq.org/php:chapter5#Slow-Subscriber-Detection-Suicidal-Snail-Pattern
Ps。要观察 ReceiveHighWatermark 的效果,必须修改问题中的示例代码。消息太小了。