遗漏保留的 MQTT 消息

Retained MQTT messages being missed

我正在尝试创建一个简单的应用程序,它会使用 MQTT(我使用的库是 M2Mqtt.net)将一些消息发布到主题,然后我想在消息已经订阅后订阅该主题发送然后让他们都收到然后丢弃,因为他们已经收到了。

我正在使用 mosquitto 2.0.12 作为代理

这是发布者:

public class MessagePublisher : IMessagePublisher
{
    private readonly MqttClient _client;

    public MessagePublisher()
    {
        _client = new MqttClient("localhost");

        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Publish(string topic, string message, bool retain = false)
    {
        Console.Write($"Sent: {topic}, {message}");

        _client.Publish(topic, Encoding.UTF8.GetBytes(message), MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE, retain);

        Total.SentAndReceived.Add(message, null);
    }
}

这是监听器:

public class MessageReceiver : IMessageReceiver
{
    private readonly MqttClient _client;

    public MessageReceiver()
    {
        _client = new MqttClient("localhost");
    }

    public void Subscribe(params string[] topics)
    {
        _client.Subscribe(topics, new[] { MqttMsgBase.QOS_LEVEL_EXACTLY_ONCE });

        _client.MqttMsgPublishReceived += client_receivedMessage;
    }


    public void Connect()
    {
        // clean session needs to be set to false so that it retains all the missed messages, not just the last one
        _client.Connect(Guid.NewGuid().ToString(), "username", "password", false, byte.MaxValue);
    }

    public void Disconnect()
    {
        _client.Disconnect();
    }

    static void client_receivedMessage(object sender, MqttMsgPublishEventArgs e)
    {
        var message = Encoding.Default.GetString(e.Message);

        Console.WriteLine($"Message Received: {message}");

        if (Total.SentAndReceived.ContainsKey(message))
            Total.SentAndReceived[message] = message;
    }
}

这是主要应用程序:

public static class Program
{
    public static void Main(string[] args)
    {
        var messageReceiver = new MessageReceiver();

        var publisher = new MessagePublisher();

        for (var i = 1; i <= 10000; i++)
        {
            publisher.Publish("Devices/", i.ToString(), true);
        }

        messageReceiver.Subscribe("Devices/");

        messageReceiver.Connect();

        Thread.Sleep(5000);

        var b = Total.SentAndReceived.Where(x => x.Value == null);

        Console.WriteLine($"{b.Count()} Missed Messages");
    }
}

我遇到的问题是有遗漏的消息。当我 运行 应用程序时,错过消息的数量总是在变化。并不是错过了最后 n 条消息,而是错过了前 n 条消息。

我希望如果我要构建一个可以收听已发布消息的服务。如果服务因任何原因停止。服务恢复在线后,将收到在该停机时间内发送的消息。

我认为你对这里的一些术语有误解。

首先,MQTT一般不会对消息进行排队。 代理将消息排队的时间是如果接收客户端已经连接并订阅了 QOS > 0 的主题。如果该客户端在发布者发送消息之前断开连接代理将对消息进行排队。如果他们随后使用相同的客户端 ID 重新连接并将 clean session 标志设置为 false,则它们将仅被发送到接收客户端。这是消息排队的唯一方式。

由于您似乎在使用随机生成的客户端 ID (Guid.NewGuid().ToString()),因此这将不起作用。您似乎也在尝试在连接之前订阅,但同样无效。

其次,保留消息与上述消息队列无关。如果在发布时设置了保留标志,则会保留一条消息。然后代理将存储该特定消息并在客户端订阅匹配主题时传递它。此消息将在有关该主题的任何其他消息之前发送。如果发布了另一条带有保留标志的消息,它将替换之前的消息,每个主题只能有 1 条保留消息。